Skip to main content

uni_query/query/df_graph/
apply.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Apply (correlated subquery) execution plan for DataFusion.
5//!
6//! Implements `CALL { ... }` subqueries by executing the subquery once per
7//! input row, injecting the input row's columns as parameters, and cross-joining
8//! the results.
9//!
10//! # Semantics
11//!
12//! For each row from the input plan:
13//! 1. Optionally filter via `input_filter`
14//! 2. Inject the input row's columns as parameters
15//! 3. Re-plan and execute the subquery with those parameters
16//! 4. Cross-join: merge each subquery result row with the input row
17//!
18//! If input produces zero rows (after filtering), execute the subquery once
19//! with the base parameters (standalone CALL support).
20
21use crate::query::df_graph::common::{
22    arrow_err, collect_all_partitions, compute_plan_properties, execute_subplan, extract_row_params,
23};
24use crate::query::df_graph::{GraphExecutionContext, MutationContext};
25use crate::query::planner::LogicalPlan;
26use arrow_array::builder::{
27    BooleanBuilder, Float64Builder, Int32Builder, Int64Builder, StringBuilder, UInt64Builder,
28};
29use arrow_array::{ArrayRef, RecordBatch};
30use arrow_schema::{DataType, SchemaRef};
31use datafusion::common::Result as DFResult;
32use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
35use datafusion::prelude::SessionContext;
36use futures::Stream;
37use parking_lot::RwLock;
38use std::any::Any;
39use std::collections::{BTreeMap, HashMap};
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_common::core::schema::Schema as UniSchema;
46use uni_cypher::ast::{Expr, UnaryOp};
47use uni_store::storage::manager::StorageManager;
48
49/// Apply (correlated subquery) execution plan.
50///
51/// The input is pre-planned as a physical plan (executed directly).
52/// The subquery is stored as a **logical** plan and re-planned per row at runtime
53/// with correlated parameters injected.
54/// Handles both `SubqueryCall` (no input_filter) and `Apply` (with input_filter).
55pub struct GraphApplyExec {
56    /// Physical plan for the driving input (e.g., MATCH scan).
57    /// Pre-planned at construction time to preserve property context.
58    input_exec: Arc<dyn ExecutionPlan>,
59
60    /// Logical plan for the correlated subquery (re-planned per row).
61    subquery_plan: LogicalPlan,
62
63    /// Optional pre-filter applied to input rows before subquery execution.
64    input_filter: Option<Expr>,
65
66    /// Graph execution context shared with sub-planners.
67    graph_ctx: Arc<GraphExecutionContext>,
68
69    /// DataFusion session context.
70    session_ctx: Arc<RwLock<SessionContext>>,
71
72    /// Storage manager for creating sub-planners.
73    storage: Arc<StorageManager>,
74
75    /// Schema for label/edge type lookups.
76    schema_info: Arc<UniSchema>,
77
78    /// Query parameters.
79    params: HashMap<String, Value>,
80
81    /// Output schema (merged: surviving input columns + subquery columns).
82    /// Subquery fields override input fields of the same name.
83    output_schema: SchemaRef,
84
85    /// Indices into `input_exec.schema()` for input columns that survive the
86    /// schema merge (i.e., their name is NOT also in the subquery's output).
87    /// Pre-computed at construction so the per-row hot path avoids re-deriving
88    /// the filter. The leading `kept_input_indices.len()` columns of
89    /// `output_schema` correspond 1:1 to these input indices.
90    kept_input_indices: Arc<[usize]>,
91
92    /// Parallel to `kept_input_indices`: when `Some((var, prop))`, the kept
93    /// input column `var.prop` must be refreshed from `sub_row[var]`'s Map
94    /// instead of sliced from the input batch. This carries SET-mutated
95    /// dotted columns from the subquery's post-SET Map across the Apply
96    /// boundary so the outer plan's `RETURN v.prop` sees the updated value.
97    kept_input_overrides: Arc<[Option<(String, String)>]>,
98
99    /// Cached plan properties.
100    properties: Arc<PlanProperties>,
101
102    /// Outer mutation context, threaded into the per-row sub-planner so that
103    /// `CALL { ... SET/CREATE/MERGE/DELETE ... }` writes route through the
104    /// same transaction's L0 buffer. `None` for read-only outer plans.
105    mutation_ctx: Option<Arc<MutationContext>>,
106
107    /// Execution metrics.
108    metrics: ExecutionPlanMetricsSet,
109}
110
111impl fmt::Debug for GraphApplyExec {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        f.debug_struct("GraphApplyExec")
114            .field("has_input_filter", &self.input_filter.is_some())
115            .finish()
116    }
117}
118
119impl GraphApplyExec {
120    /// Create a new Apply execution plan.
121    #[expect(clippy::too_many_arguments)]
122    pub fn new(
123        input_exec: Arc<dyn ExecutionPlan>,
124        subquery_plan: LogicalPlan,
125        input_filter: Option<Expr>,
126        graph_ctx: Arc<GraphExecutionContext>,
127        session_ctx: Arc<RwLock<SessionContext>>,
128        storage: Arc<StorageManager>,
129        schema_info: Arc<UniSchema>,
130        params: HashMap<String, Value>,
131        output_schema: SchemaRef,
132        kept_input_indices: Vec<usize>,
133        kept_input_overrides: Vec<Option<(String, String)>>,
134        mutation_ctx: Option<Arc<MutationContext>>,
135    ) -> Self {
136        let properties = compute_plan_properties(output_schema.clone());
137
138        Self {
139            input_exec,
140            subquery_plan,
141            input_filter,
142            graph_ctx,
143            session_ctx,
144            storage,
145            schema_info,
146            params,
147            output_schema,
148            kept_input_indices: kept_input_indices.into(),
149            kept_input_overrides: kept_input_overrides.into(),
150            properties,
151            mutation_ctx,
152            metrics: ExecutionPlanMetricsSet::new(),
153        }
154    }
155}
156
157impl DisplayAs for GraphApplyExec {
158    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159        write!(
160            f,
161            "GraphApplyExec: filter={}",
162            if self.input_filter.is_some() {
163                "yes"
164            } else {
165                "none"
166            }
167        )
168    }
169}
170
171impl ExecutionPlan for GraphApplyExec {
172    fn name(&self) -> &str {
173        "GraphApplyExec"
174    }
175
176    fn as_any(&self) -> &dyn Any {
177        self
178    }
179
180    fn schema(&self) -> SchemaRef {
181        self.output_schema.clone()
182    }
183
184    fn properties(&self) -> &Arc<PlanProperties> {
185        &self.properties
186    }
187
188    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
189        // No physical children — sub-plans are re-planned at execution time
190        vec![]
191    }
192
193    fn with_new_children(
194        self: Arc<Self>,
195        children: Vec<Arc<dyn ExecutionPlan>>,
196    ) -> DFResult<Arc<dyn ExecutionPlan>> {
197        if !children.is_empty() {
198            return Err(datafusion::error::DataFusionError::Plan(
199                "GraphApplyExec has no children".to_string(),
200            ));
201        }
202        Ok(self)
203    }
204
205    fn execute(
206        &self,
207        partition: usize,
208        _context: Arc<TaskContext>,
209    ) -> DFResult<SendableRecordBatchStream> {
210        let metrics = BaselineMetrics::new(&self.metrics, partition);
211
212        let input_exec = self.input_exec.clone();
213        let subquery_plan = self.subquery_plan.clone();
214        let input_filter = self.input_filter.clone();
215        let graph_ctx = self.graph_ctx.clone();
216        let session_ctx = self.session_ctx.clone();
217        let storage = self.storage.clone();
218        let schema_info = self.schema_info.clone();
219        let params = self.params.clone();
220        let output_schema = self.output_schema.clone();
221        let kept_input_indices = self.kept_input_indices.clone();
222        let kept_input_overrides = self.kept_input_overrides.clone();
223        let mutation_ctx = self.mutation_ctx.clone();
224
225        let fut = async move {
226            run_apply(
227                input_exec,
228                &subquery_plan,
229                input_filter.as_ref(),
230                &graph_ctx,
231                &session_ctx,
232                &storage,
233                &schema_info,
234                &params,
235                &output_schema,
236                &kept_input_indices,
237                &kept_input_overrides,
238                mutation_ctx.as_ref(),
239            )
240            .await
241        };
242
243        Ok(Box::pin(ApplyStream {
244            state: ApplyStreamState::Running(Box::pin(fut)),
245            schema: self.output_schema.clone(),
246            metrics,
247        }))
248    }
249
250    fn metrics(&self) -> Option<MetricsSet> {
251        Some(self.metrics.clone_inner())
252    }
253}
254
255// ---------------------------------------------------------------------------
256// Core apply logic
257// ---------------------------------------------------------------------------
258
259/// Convert record batches into row-oriented `HashMap<String, Value>` representation.
260fn batches_to_row_maps(batches: &[RecordBatch]) -> Vec<HashMap<String, Value>> {
261    batches
262        .iter()
263        .flat_map(|batch| {
264            (0..batch.num_rows()).map(move |row_idx| extract_row_params(batch, row_idx))
265        })
266        .collect()
267}
268
269/// Evaluate a Cypher filter expression against a row.
270///
271/// Supports simple binary comparisons and boolean operations needed for
272/// input_filter pushdown (e.g., `p.age > 30`, `p.status = 'active'`).
273fn evaluate_filter(filter: &Expr, row: &HashMap<String, Value>) -> bool {
274    match filter {
275        Expr::BinaryOp { left, op, right } => {
276            use uni_cypher::ast::BinaryOp;
277            match op {
278                BinaryOp::And => evaluate_filter(left, row) && evaluate_filter(right, row),
279                BinaryOp::Or => evaluate_filter(left, row) || evaluate_filter(right, row),
280                _ => {
281                    let left_val = resolve_expr_value(left, row);
282                    let right_val = resolve_expr_value(right, row);
283                    evaluate_comparison(op, &left_val, &right_val)
284                }
285            }
286        }
287        Expr::UnaryOp {
288            op: UnaryOp::Not,
289            expr,
290        } => !evaluate_filter(expr, row),
291        _ => {
292            // Treat any other expression as a truth test on its resolved value
293            let val = resolve_expr_value(filter, row);
294            val.as_bool().unwrap_or(false)
295        }
296    }
297}
298
299/// Resolve a simple expression to a Value using the row context.
300fn resolve_expr_value(expr: &Expr, row: &HashMap<String, Value>) -> Value {
301    match expr {
302        Expr::Literal(lit) => lit.to_value(),
303        Expr::Variable(name) => row.get(name).cloned().unwrap_or(Value::Null),
304        Expr::Property(base_expr, key) => {
305            if let Expr::Variable(var) = base_expr.as_ref() {
306                // Look up "var.key" in the row map
307                let col_name = format!("{}.{}", var, key);
308                row.get(&col_name).cloned().unwrap_or(Value::Null)
309            } else {
310                Value::Null
311            }
312        }
313        _ => Value::Null,
314    }
315}
316
317/// Compare two Values for ordering.
318fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
319    match (a, b) {
320        (Value::Int(a), Value::Int(b)) => Some(a.cmp(b)),
321        (Value::Float(a), Value::Float(b)) => a.partial_cmp(b),
322        (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
323        (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)),
324        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
325        _ => None,
326    }
327}
328
329/// Evaluate a binary comparison operator on two Values.
330///
331/// Handles equality (`Eq`, `NotEq`) directly and delegates ordering
332/// comparisons (`Lt`, `LtEq`, `Gt`, `GtEq`) to [`compare_values`].
333fn evaluate_comparison(op: &uni_cypher::ast::BinaryOp, left: &Value, right: &Value) -> bool {
334    use std::cmp::Ordering;
335    use uni_cypher::ast::BinaryOp;
336
337    match op {
338        BinaryOp::Eq => left == right,
339        BinaryOp::NotEq => left != right,
340        BinaryOp::Lt => compare_values(left, right) == Some(Ordering::Less),
341        BinaryOp::LtEq => matches!(
342            compare_values(left, right),
343            Some(Ordering::Less | Ordering::Equal)
344        ),
345        BinaryOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
346        BinaryOp::GtEq => matches!(
347            compare_values(left, right),
348            Some(Ordering::Greater | Ordering::Equal)
349        ),
350        _ => false,
351    }
352}
353
354/// Build a typed column from row maps using a builder and value extractor.
355///
356/// For each row, looks up `col_name`, applies `extract` to get an `Option<T>`,
357/// and appends the value or null to the builder.
358fn build_column<B, T>(
359    rows: &[HashMap<String, Value>],
360    col_name: &str,
361    mut builder: B,
362    extract: impl Fn(&Value) -> Option<T>,
363) -> ArrayRef
364where
365    B: arrow_array::builder::ArrayBuilder,
366    B: PrimitiveAppend<T>,
367{
368    for row in rows {
369        match row.get(col_name).and_then(&extract) {
370            Some(v) => builder.append_typed_value(v),
371            None => builder.append_typed_null(),
372        }
373    }
374    Arc::new(builder.finish_to_array())
375}
376
377/// Trait to abstract over typed append for primitive Arrow builders.
378///
379/// This avoids repeating the same get-value/convert/append-or-null pattern
380/// for each numeric/boolean type in `rows_to_batch`.
381trait PrimitiveAppend<T> {
382    fn append_typed_value(&mut self, val: T);
383    fn append_typed_null(&mut self);
384    fn finish_to_array(self) -> ArrayRef;
385}
386
387macro_rules! impl_primitive_append {
388    ($builder:ty, $native:ty, $array:ty) => {
389        impl PrimitiveAppend<$native> for $builder {
390            fn append_typed_value(&mut self, val: $native) {
391                self.append_value(val);
392            }
393            fn append_typed_null(&mut self) {
394                self.append_null();
395            }
396            fn finish_to_array(mut self) -> ArrayRef {
397                Arc::new(self.finish()) as ArrayRef
398            }
399        }
400    };
401}
402
403impl_primitive_append!(UInt64Builder, u64, arrow_array::UInt64Array);
404impl_primitive_append!(Int64Builder, i64, arrow_array::Int64Array);
405impl_primitive_append!(Int32Builder, i32, arrow_array::Int32Array);
406impl_primitive_append!(Float64Builder, f64, arrow_array::Float64Array);
407impl_primitive_append!(BooleanBuilder, bool, arrow_array::BooleanArray);
408
409/// Build a RecordBatch from merged row maps using the output schema.
410fn rows_to_batch(rows: &[HashMap<String, Value>], schema: &SchemaRef) -> DFResult<RecordBatch> {
411    if rows.is_empty() {
412        return Ok(RecordBatch::new_empty(schema.clone()));
413    }
414
415    let num_rows = rows.len();
416    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
417
418    for field in schema.fields() {
419        let col_name = field.name();
420        let col = match field.data_type() {
421            DataType::UInt64 => build_column(
422                rows,
423                col_name,
424                UInt64Builder::with_capacity(num_rows),
425                |v| v.as_u64().or_else(|| v.as_i64().map(|i| i as u64)),
426            ),
427            DataType::Int64 => build_column(
428                rows,
429                col_name,
430                Int64Builder::with_capacity(num_rows),
431                Value::as_i64,
432            ),
433            DataType::Int32 => {
434                build_column(rows, col_name, Int32Builder::with_capacity(num_rows), |v| {
435                    v.as_i64().map(|i| i as i32)
436                })
437            }
438            DataType::Float64 => build_column(
439                rows,
440                col_name,
441                Float64Builder::with_capacity(num_rows),
442                Value::as_f64,
443            ),
444            DataType::Boolean => build_column(
445                rows,
446                col_name,
447                BooleanBuilder::with_capacity(num_rows),
448                Value::as_bool,
449            ),
450            DataType::LargeBinary => {
451                let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(
452                    num_rows,
453                    num_rows * 64,
454                );
455                for row in rows {
456                    match row.get(col_name) {
457                        Some(val) if !val.is_null() => {
458                            let cv_bytes = uni_common::cypher_value_codec::encode(val);
459                            builder.append_value(&cv_bytes);
460                        }
461                        _ => builder.append_null(),
462                    }
463                }
464                Arc::new(builder.finish()) as ArrayRef
465            }
466            DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
467                let mut builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
468                for row in rows {
469                    match row.get(col_name) {
470                        Some(Value::List(items)) => {
471                            for item in items {
472                                match item {
473                                    Value::String(s) => builder.values().append_value(s),
474                                    Value::Null => builder.values().append_null(),
475                                    other => builder.values().append_value(format!("{other}")),
476                                }
477                            }
478                            builder.append(true);
479                        }
480                        _ => builder.append_null(),
481                    }
482                }
483                Arc::new(builder.finish()) as ArrayRef
484            }
485            DataType::Null => Arc::new(arrow_array::NullArray::new(num_rows)) as ArrayRef,
486            // Default: Utf8 for everything else
487            _ => {
488                let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
489                for row in rows {
490                    match row.get(col_name) {
491                        Some(Value::Null) | None => builder.append_null(),
492                        Some(Value::String(s)) => builder.append_value(s),
493                        Some(other) => builder.append_value(format!("{other}")),
494                    }
495                }
496                Arc::new(builder.finish()) as ArrayRef
497            }
498        };
499        columns.push(col);
500    }
501
502    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
503}
504
505/// Slice a single row, projecting only the input columns whose name survives
506/// the Apply schema merge (i.e., not overridden by a subquery RETURN column).
507fn slice_kept_row(batch: &RecordBatch, row_idx: usize, kept: &[usize]) -> Vec<ArrayRef> {
508    kept.iter()
509        .map(|&i| batch.column(i).slice(row_idx, 1))
510        .collect()
511}
512
513/// Check if a logical plan is or contains a ProcedureCall node.
514/// This helps distinguish procedure calls (CALL...YIELD) from regular subqueries (CALL { ... }).
515fn is_procedure_call(plan: &LogicalPlan) -> bool {
516    match plan {
517        LogicalPlan::ProcedureCall { .. } => true,
518        LogicalPlan::Project { input, .. }
519        | LogicalPlan::Filter { input, .. }
520        | LogicalPlan::Sort { input, .. }
521        | LogicalPlan::Limit { input, .. }
522        | LogicalPlan::Distinct { input } => is_procedure_call(input),
523        _ => false,
524    }
525}
526
527/// Recursively check whether a logical plan contains any write operation.
528///
529/// Subqueries that mutate state must execute once per correlated input row;
530/// the IN-list batching optimization is safe only for read-only subqueries.
531fn plan_contains_writes(plan: &LogicalPlan) -> bool {
532    use crate::query::planner::LogicalPlan as LP;
533    match plan {
534        LP::Create { .. }
535        | LP::CreateBatch { .. }
536        | LP::Merge { .. }
537        | LP::Delete { .. }
538        | LP::Set { .. }
539        | LP::Remove { .. }
540        | LP::Foreach { .. } => true,
541        LP::Project { input, .. }
542        | LP::Filter { input, .. }
543        | LP::Sort { input, .. }
544        | LP::Limit { input, .. }
545        | LP::Distinct { input }
546        | LP::Unwind { input, .. }
547        | LP::Aggregate { input, .. } => plan_contains_writes(input),
548        LP::Apply {
549            input, subquery, ..
550        }
551        | LP::SubqueryCall { input, subquery } => {
552            plan_contains_writes(input) || plan_contains_writes(subquery)
553        }
554        _ => false,
555    }
556}
557
558/// Build an owned, canonical cache key for a row's correlation parameters.
559///
560/// A `BTreeMap` makes the key order-independent and gives real `Hash` + `Eq`
561/// over `Value`. The previous implementation keyed the dedup cache by a bare
562/// `u64` `DefaultHasher` digest of `format!("{val:?}")` with NO equality
563/// re-check, so any hash collision (or two values whose Debug renders
564/// identically) returned another row's subquery results. (review H7)
565fn canonical_params_key(params: &HashMap<String, Value>) -> BTreeMap<String, Value> {
566    params.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
567}
568
569/// Check if batching is eligible for this apply operation.
570/// Returns true if:
571/// - There are 2+ filtered entries (single row → existing path)
572/// - At least one `._vid` correlation key exists
573fn is_batch_eligible(filtered_entries: &[(&RecordBatch, usize, HashMap<String, Value>)]) -> bool {
574    if filtered_entries.len() < 2 {
575        return false;
576    }
577
578    // Check if at least one correlation key (._vid) exists
579    filtered_entries
580        .iter()
581        .any(|(_, _, row_params)| row_params.keys().any(|k| k.ends_with("._vid")))
582}
583
584/// Run the apply operation: execute input, filter, correlate subquery, merge results.
585///
586/// Uses Arrow-native row slicing for input columns to preserve complex types
587/// (Struct, List, etc.), and only converts to Value for parameter injection.
588#[expect(clippy::too_many_arguments)]
589async fn run_apply(
590    input_exec: Arc<dyn ExecutionPlan>,
591    subquery_plan: &LogicalPlan,
592    input_filter: Option<&Expr>,
593    graph_ctx: &Arc<GraphExecutionContext>,
594    session_ctx: &Arc<RwLock<SessionContext>>,
595    storage: &Arc<StorageManager>,
596    schema_info: &Arc<UniSchema>,
597    params: &HashMap<String, Value>,
598    output_schema: &SchemaRef,
599    kept_input_indices: &[usize],
600    kept_input_overrides: &[Option<(String, String)>],
601    mutation_ctx: Option<&Arc<MutationContext>>,
602) -> DFResult<RecordBatch> {
603    let apply_start = std::time::Instant::now();
604    let is_proc_call = is_procedure_call(subquery_plan);
605    tracing::debug!("run_apply: is_procedure_call={}", is_proc_call);
606
607    // 1. Execute pre-planned input physical plan directly
608    let task_ctx = session_ctx.read().task_ctx();
609    let input_batches = collect_all_partitions(&input_exec, task_ctx).await?;
610
611    // 2. Collect (batch_ref, row_idx) for rows that pass the input filter,
612    //    along with their Value-based params for subquery injection.
613    let mut filtered_entries: Vec<(&RecordBatch, usize, HashMap<String, Value>)> = Vec::new();
614    for batch in &input_batches {
615        for row_idx in 0..batch.num_rows() {
616            let row_params = extract_row_params(batch, row_idx);
617            if let Some(filter) = input_filter
618                && !evaluate_filter(filter, &row_params)
619            {
620                continue;
621            }
622            filtered_entries.push((batch, row_idx, row_params));
623        }
624    }
625
626    tracing::debug!(
627        "run_apply: filtered_entries count = {}",
628        filtered_entries.len()
629    );
630
631    let subquery_has_writes = plan_contains_writes(subquery_plan);
632
633    // 3. Handle empty input: execute subquery once with base params.
634    //
635    // For unit subqueries (no RETURN, schema has no subquery fields) we skip
636    // the call entirely: with zero outer rows there's nothing to drive
637    // per-row side effects, and correlated parameter resolution would fail
638    // (`Unresolved parameter: $n`). The same logic applies to any
639    // write-bearing subquery — running it once with no outer correlation
640    // would either fail to resolve params or write phantom rows.
641    let is_unit_subquery = output_schema.fields().len() == kept_input_indices.len();
642    if filtered_entries.is_empty() {
643        if is_unit_subquery || subquery_has_writes {
644            return Ok(RecordBatch::new_empty(output_schema.clone()));
645        }
646        let sub_batches = execute_subplan(
647            subquery_plan,
648            params,
649            &HashMap::new(), // No outer values for empty input case
650            graph_ctx,
651            session_ctx,
652            storage,
653            schema_info,
654            mutation_ctx,
655        )
656        .await?;
657        let sub_rows = batches_to_row_maps(&sub_batches);
658        return rows_to_batch(&sub_rows, output_schema);
659    }
660
661    // 4. Check if we can batch the subplan execution
662    // IMPORTANT: Only batch when NOT a procedure call AND has input_filter.
663    // - Procedure calls use outer_values (not params), incompatible with batching
664    // - No input_filter indicates CALL subquery (e.g., MATCH (p) CALL { MATCH (p) })
665    //   which requires per-row correlation, not batching
666    // - Target pattern: procedure call → Apply with filter → MATCH traversal
667    let has_filter = input_filter.is_some();
668
669    if is_batch_eligible(&filtered_entries) && !is_proc_call && has_filter && !subquery_has_writes {
670        tracing::debug!("run_apply: batching eligible, attempting batch execution");
671
672        // Collect unique VID values and build batched params
673        let mut vid_values: HashMap<String, Vec<Value>> = HashMap::new();
674        for (_, _, row_params) in &filtered_entries {
675            for (key, value) in row_params {
676                if key.ends_with("._vid") {
677                    vid_values
678                        .entry(key.clone())
679                        .or_default()
680                        .push(value.clone());
681                }
682            }
683        }
684
685        // Build batched params: VID keys become Value::List
686        let mut batched_params = params.clone();
687        for (key, values) in &vid_values {
688            batched_params.insert(key.clone(), Value::List(values.clone()));
689        }
690
691        // Add carry-through parameters from first row (for literals in projections)
692        // These won't affect the WHERE filter but ensure planning succeeds
693        if let Some((_, _, first_row_params)) = filtered_entries.first() {
694            for (key, value) in first_row_params {
695                if !key.ends_with("._vid") {
696                    batched_params
697                        .entry(key.clone())
698                        .or_insert_with(|| value.clone());
699                }
700            }
701        }
702
703        // Execute subquery ONCE with batched VID params
704        let subplan_start = std::time::Instant::now();
705        let sub_batches = execute_subplan(
706            subquery_plan,
707            &batched_params,
708            &HashMap::new(),
709            graph_ctx,
710            session_ctx,
711            storage,
712            schema_info,
713            mutation_ctx,
714        )
715        .await?;
716        let subplan_elapsed = subplan_start.elapsed();
717        tracing::debug!(
718            "run_apply: batch execute_subplan took {:?}",
719            subplan_elapsed
720        );
721
722        // Build hash index: VID → Vec<subquery result rows>
723        let sub_rows = batches_to_row_maps(&sub_batches);
724        let mut sub_index: HashMap<i64, Vec<&HashMap<String, Value>>> = HashMap::new();
725
726        // Find the VID key (should be the same for all rows)
727        let vid_key = vid_values.keys().next().expect("at least one VID key");
728
729        for sub_row in &sub_rows {
730            if let Some(Value::Int(vid)) = sub_row.get(vid_key) {
731                sub_index.entry(*vid).or_default().push(sub_row);
732            }
733        }
734
735        // Hash-join: for each input row, look up by VID, emit input+subquery columns.
736        // `kept_input_indices` filters out input columns whose names are
737        // overridden by subquery RETURN columns.
738        let num_input_cols = kept_input_indices.len();
739        let num_output_cols = output_schema.fields().len();
740        let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
741
742        for (batch, row_idx, row_params) in &filtered_entries {
743            // Extract VID from row params
744            let input_vid = if let Some(Value::Int(vid)) = row_params.get(vid_key) {
745                *vid
746            } else {
747                continue; // Skip if VID is not present
748            };
749
750            let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
751
752            // Look up matching subquery rows by VID
753            if let Some(matching_sub_rows) = sub_index.get(&input_vid) {
754                for sub_row in matching_sub_rows {
755                    append_cross_join_row(
756                        &mut column_arrays,
757                        &input_row_arrays,
758                        sub_row,
759                        output_schema,
760                        num_input_cols,
761                        kept_input_overrides,
762                        is_unit_subquery,
763                    )?;
764                }
765            } else if is_unit_subquery {
766                // Unit subquery: side effects (writes) have run as part of the
767                // bulk sub-plan execution above; pass the input row through.
768                for (col_idx, arr) in input_row_arrays.iter().enumerate() {
769                    column_arrays[col_idx].push(arr.clone());
770                }
771            }
772            // else: inner join — skip input row (no subquery matches)
773        }
774
775        let result = concat_column_arrays(&column_arrays, output_schema);
776
777        let apply_elapsed = apply_start.elapsed();
778        tracing::debug!(
779            "run_apply: completed (batched) in {:?}, 1 subplan execution",
780            apply_elapsed
781        );
782
783        return result;
784    }
785
786    // 5. Fallback: For each input row, execute subquery and collect output column arrays.
787    //    Used when batching is not eligible (single row, no VID keys, or procedure call).
788    //    Each output row is: surviving input columns (sliced via
789    //    `kept_input_indices`) + subquery columns. Input columns whose name is
790    //    overridden by a subquery RETURN are dropped here so the merged
791    //    `output_schema` matches the data layout.
792    let num_input_cols = kept_input_indices.len();
793    let num_output_cols = output_schema.fields().len();
794    // Accumulate per-column arrays for all output rows
795    let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
796
797    let mut total_subplan_time = std::time::Duration::ZERO;
798    let mut subplan_executions = 0;
799
800    // Cache to deduplicate subplan executions for identical row parameters.
801    // Keyed by the owned, sorted params (real Hash + Eq) — not a bare u64 hash —
802    // so a hash collision can never serve a different row's results. (review H7)
803    let mut subplan_cache: HashMap<BTreeMap<String, Value>, Vec<HashMap<String, Value>>> =
804        HashMap::new();
805    let mut cache_hits = 0;
806
807    for (batch, row_idx, row_params) in &filtered_entries {
808        // For procedure calls (CALL...YIELD), pass row_params as outer_values to avoid
809        // shadowing user parameters. For regular subqueries (CALL { ... }), merge them
810        // into parameters for backward compatibility with correlated variables.
811        let (sub_params, sub_outer_values) = if is_procedure_call(subquery_plan) {
812            // Procedure call: keep params separate from outer values
813            (params.clone(), row_params.clone())
814        } else {
815            // Regular subquery: merge outer values into params (old behavior)
816            let mut merged = params.clone();
817            merged.extend(row_params.clone());
818            (merged, HashMap::new())
819        };
820
821        // Check cache for identical row params
822        let params_key = canonical_params_key(row_params);
823        let sub_rows = if let Some(cached_rows) = subplan_cache.get(&params_key) {
824            // Cache hit: reuse previous results
825            cache_hits += 1;
826            tracing::debug!("run_apply: cache hit for row params, skipping execute_subplan");
827            cached_rows.clone()
828        } else {
829            // Cache miss: execute subplan
830            let subplan_start = std::time::Instant::now();
831            let sub_batches = execute_subplan(
832                subquery_plan,
833                &sub_params,
834                &sub_outer_values,
835                graph_ctx,
836                session_ctx,
837                storage,
838                schema_info,
839                mutation_ctx,
840            )
841            .await?;
842            let subplan_elapsed = subplan_start.elapsed();
843            total_subplan_time += subplan_elapsed;
844            subplan_executions += 1;
845
846            tracing::debug!(
847                "run_apply: execute_subplan #{} took {:?}",
848                subplan_executions,
849                subplan_elapsed
850            );
851
852            let rows = batches_to_row_maps(&sub_batches);
853            subplan_cache.insert(params_key, rows.clone());
854            rows
855        };
856
857        let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
858
859        if sub_rows.is_empty() {
860            if is_unit_subquery {
861                // Unit subquery: side effects have executed; pass the input
862                // row through (no subquery columns to append).
863                for (col_idx, arr) in input_row_arrays.iter().enumerate() {
864                    column_arrays[col_idx].push(arr.clone());
865                }
866            }
867            // else: inner-join semantics — skip this input row.
868            continue;
869        }
870
871        for sub_row in &sub_rows {
872            append_cross_join_row(
873                &mut column_arrays,
874                &input_row_arrays,
875                sub_row,
876                output_schema,
877                num_input_cols,
878                kept_input_overrides,
879                is_unit_subquery,
880            )?;
881        }
882    }
883
884    // 5. Concatenate all accumulated arrays per column
885    let result = concat_column_arrays(&column_arrays, output_schema);
886
887    let apply_elapsed = apply_start.elapsed();
888    tracing::debug!(
889        "run_apply: completed in {:?}, {} subplan executions, {} cache hits, {:?} total subplan time",
890        apply_elapsed,
891        subplan_executions,
892        cache_hits,
893        total_subplan_time
894    );
895
896    result
897}
898
899/// Build a single-row Arrow array from a builder and optional value.
900fn single_row_array<B, T>(mut builder: B, val: Option<T>) -> ArrayRef
901where
902    B: PrimitiveAppend<T>,
903{
904    match val {
905        Some(v) => builder.append_typed_value(v),
906        None => builder.append_typed_null(),
907    }
908    builder.finish_to_array()
909}
910
911/// Convert a single Value to a single-row Arrow array of the given type.
912fn value_to_single_row_array(val: &Value, data_type: &DataType) -> DFResult<ArrayRef> {
913    Ok(match data_type {
914        DataType::UInt64 => single_row_array(
915            UInt64Builder::with_capacity(1),
916            val.as_u64().or_else(|| val.as_i64().map(|v| v as u64)),
917        ),
918        DataType::Int64 => single_row_array(Int64Builder::with_capacity(1), val.as_i64()),
919        DataType::Int32 => single_row_array(
920            Int32Builder::with_capacity(1),
921            val.as_i64().map(|v| v as i32),
922        ),
923        DataType::Float64 => single_row_array(Float64Builder::with_capacity(1), val.as_f64()),
924        DataType::Boolean => single_row_array(BooleanBuilder::with_capacity(1), val.as_bool()),
925        DataType::Null => Arc::new(arrow_array::NullArray::new(1)) as ArrayRef,
926        DataType::LargeBinary => {
927            let mut b = arrow_array::builder::LargeBinaryBuilder::with_capacity(1, 64);
928            if val.is_null() {
929                b.append_null();
930            } else {
931                let cv_bytes = uni_common::cypher_value_codec::encode(val);
932                b.append_value(&cv_bytes);
933            }
934            Arc::new(b.finish()) as ArrayRef
935        }
936        DataType::Utf8 => {
937            let mut b = StringBuilder::with_capacity(1, 64);
938            match val {
939                Value::Null => b.append_null(),
940                Value::String(s) => b.append_value(s),
941                other => b.append_value(format!("{other}")),
942            }
943            Arc::new(b.finish()) as ArrayRef
944        }
945        DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
946            let mut b = arrow_array::builder::ListBuilder::new(StringBuilder::new());
947            match val {
948                Value::List(items) => {
949                    for item in items {
950                        match item {
951                            Value::String(s) => b.values().append_value(s),
952                            Value::Null => b.values().append_null(),
953                            other => b.values().append_value(format!("{other}")),
954                        }
955                    }
956                    b.append(true);
957                }
958                Value::Null => b.append_null(),
959                other => {
960                    b.values().append_value(format!("{other}"));
961                    b.append(true);
962                }
963            }
964            Arc::new(b.finish()) as ArrayRef
965        }
966        DataType::Struct(fields) => {
967            // Encode a graph entity (`Value::Map` / `Value::Node` / `Value::Edge`)
968            // into a single-row StructArray matching the declared field
969            // layout. Used by the unit-subquery refresh path so the bare
970            // entity column reflects post-SET state — `compile_property_access`
971            // (expr_compiler.rs) tries struct-field extraction before flat
972            // columns, so a stale Struct would shadow our refreshed dotted
973            // columns.
974            let map_view: Option<&HashMap<String, Value>> = match val {
975                Value::Map(m) => Some(m),
976                Value::Node(n) => Some(&n.properties),
977                Value::Edge(e) => Some(&e.properties),
978                _ => None,
979            };
980            let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(fields.len());
981            for child_field in fields.iter() {
982                let child_val = map_view
983                    .and_then(|m| m.get(child_field.name()))
984                    .cloned()
985                    .unwrap_or(Value::Null);
986                child_arrays.push(value_to_single_row_array(
987                    &child_val,
988                    child_field.data_type(),
989                )?);
990            }
991            let pairs: Vec<(Arc<arrow_schema::Field>, ArrayRef)> =
992                fields.iter().cloned().zip(child_arrays).collect();
993            Arc::new(arrow_array::StructArray::from(pairs)) as ArrayRef
994        }
995        _ => {
996            debug_assert!(
997                false,
998                "value_to_single_row_array: unhandled DataType {:?} — mirror the arm in rows_to_batch",
999                data_type
1000            );
1001            let mut b = StringBuilder::with_capacity(1, 64);
1002            match val {
1003                Value::Null => b.append_null(),
1004                Value::String(s) => b.append_value(s),
1005                other => b.append_value(format!("{other}")),
1006            }
1007            Arc::new(b.finish()) as ArrayRef
1008        }
1009    })
1010}
1011
1012/// Append one cross-joined row (input + subquery) to the per-column accumulator.
1013///
1014/// Input columns use Arrow-native sliced arrays to preserve complex types,
1015/// EXCEPT:
1016///   * `kept_input_overrides[i] = Some((var, prop))` — refresh `var.prop`
1017///     from the subquery's post-SET bare `var` Map in `sub_row` so dotted
1018///     columns surface fresh values across the Apply boundary.
1019///   * When `is_unit_subquery` is true, ALSO refresh any kept input column
1020///     whose name appears as a key in `sub_row`. Unit subqueries (no
1021///     RETURN, write-only side effects) re-emit the modified outer row
1022///     under the SAME column names; using the sub_row value gives outer
1023///     `RETURN v.prop` the post-SET binding even though the unit subquery
1024///     contributes no explicit RETURN fields.
1025///
1026/// Subquery columns convert `Value` to single-row Arrow arrays as before.
1027fn append_cross_join_row(
1028    column_arrays: &mut [Vec<ArrayRef>],
1029    input_row_arrays: &[ArrayRef],
1030    sub_row: &HashMap<String, Value>,
1031    output_schema: &SchemaRef,
1032    num_input_cols: usize,
1033    kept_input_overrides: &[Option<(String, String)>],
1034    is_unit_subquery: bool,
1035) -> DFResult<()> {
1036    // Add input columns (Arrow-native), with per-column refresh from sub_row
1037    // when applicable (see fn-doc).
1038    for (col_idx, arr) in input_row_arrays.iter().enumerate() {
1039        if let Some(Some((var, prop))) = kept_input_overrides.get(col_idx) {
1040            let extracted = match sub_row.get(var) {
1041                Some(Value::Map(m)) => m.get(prop).cloned().unwrap_or(Value::Null),
1042                Some(Value::Node(n)) => n.properties.get(prop).cloned().unwrap_or(Value::Null),
1043                Some(Value::Edge(e)) => e.properties.get(prop).cloned().unwrap_or(Value::Null),
1044                _ => Value::Null,
1045            };
1046            let field = &output_schema.fields()[col_idx];
1047            let new_arr = value_to_single_row_array(&extracted, field.data_type())?;
1048            column_arrays[col_idx].push(new_arr);
1049            continue;
1050        }
1051        if is_unit_subquery {
1052            // Refresh the kept input column from the subquery's post-SET
1053            // sub_row. Two cases:
1054            //   * Dotted (`v.prop`): extract `prop` from `sub_row[v]` (the
1055            //     subquery emits the modified bare Map under key `v`, not
1056            //     as dotted columns).
1057            //   * Bare (`v`): replace with `sub_row[v]` itself, encoded
1058            //     into the field's declared type. This covers the Struct
1059            //     case (`value_to_single_row_array` now has a Struct arm)
1060            //     and is required because `compile_property_access` in
1061            //     `expr_compiler.rs` tries Struct-field extraction BEFORE
1062            //     the flat-column fallback — a stale Struct would shadow
1063            //     refreshed dotted columns.
1064            let field = &output_schema.fields()[col_idx];
1065            let refreshed: Option<Value> = if let Some(dot) = field.name().find('.') {
1066                let base = &field.name()[..dot];
1067                let prop = &field.name()[dot + 1..];
1068                match sub_row.get(base) {
1069                    Some(Value::Map(m)) => m.get(prop).cloned(),
1070                    Some(Value::Node(n)) => n.properties.get(prop).cloned(),
1071                    Some(Value::Edge(e)) => e.properties.get(prop).cloned(),
1072                    _ => None,
1073                }
1074            } else {
1075                sub_row.get(field.name()).cloned()
1076            };
1077            if let Some(val) = refreshed {
1078                let new_arr = value_to_single_row_array(&val, field.data_type())?;
1079                column_arrays[col_idx].push(new_arr);
1080                continue;
1081            }
1082        }
1083        column_arrays[col_idx].push(arr.clone());
1084    }
1085
1086    // Add subquery columns using Value -> Arrow conversion
1087    let num_output_cols = output_schema.fields().len();
1088    for (col_arr, field) in column_arrays[num_input_cols..num_output_cols]
1089        .iter_mut()
1090        .zip(output_schema.fields()[num_input_cols..num_output_cols].iter())
1091    {
1092        let col_name = field.name();
1093        let val = sub_row.get(col_name).cloned().unwrap_or(Value::Null);
1094        let arr = value_to_single_row_array(&val, field.data_type())?;
1095        col_arr.push(arr);
1096    }
1097    Ok(())
1098}
1099
1100/// Concatenate per-column array accumulators into a single `RecordBatch`.
1101///
1102/// Returns an empty batch if no rows were accumulated.
1103fn concat_column_arrays(
1104    column_arrays: &[Vec<ArrayRef>],
1105    output_schema: &SchemaRef,
1106) -> DFResult<RecordBatch> {
1107    if column_arrays[0].is_empty() {
1108        return Ok(RecordBatch::new_empty(output_schema.clone()));
1109    }
1110
1111    let mut final_columns: Vec<ArrayRef> = Vec::with_capacity(column_arrays.len());
1112    for arrays in column_arrays {
1113        let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(|a| a.as_ref()).collect();
1114        let concatenated = arrow::compute::concat(&refs).map_err(arrow_err)?;
1115        final_columns.push(concatenated);
1116    }
1117
1118    RecordBatch::try_new(output_schema.clone(), final_columns).map_err(arrow_err)
1119}
1120
1121// ---------------------------------------------------------------------------
1122// Stream implementation
1123// ---------------------------------------------------------------------------
1124
1125/// Stream state for the apply operation.
1126enum ApplyStreamState {
1127    /// The apply computation is running.
1128    Running(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
1129    /// Computation completed.
1130    Done,
1131}
1132
1133/// Stream that runs the apply operation and emits the result.
1134struct ApplyStream {
1135    state: ApplyStreamState,
1136    schema: SchemaRef,
1137    metrics: BaselineMetrics,
1138}
1139
1140impl Stream for ApplyStream {
1141    type Item = DFResult<RecordBatch>;
1142
1143    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1144        let metrics = self.metrics.clone();
1145        let _timer = metrics.elapsed_compute().timer();
1146        match &mut self.state {
1147            ApplyStreamState::Running(fut) => match fut.as_mut().poll(cx) {
1148                Poll::Ready(Ok(batch)) => {
1149                    self.metrics.record_output(batch.num_rows());
1150                    self.state = ApplyStreamState::Done;
1151                    Poll::Ready(Some(Ok(batch)))
1152                }
1153                Poll::Ready(Err(e)) => {
1154                    self.state = ApplyStreamState::Done;
1155                    Poll::Ready(Some(Err(e)))
1156                }
1157                Poll::Pending => Poll::Pending,
1158            },
1159            ApplyStreamState::Done => Poll::Ready(None),
1160        }
1161    }
1162}
1163
1164impl RecordBatchStream for ApplyStream {
1165    fn schema(&self) -> SchemaRef {
1166        self.schema.clone()
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::*;
1173
1174    fn params(pairs: &[(&str, Value)]) -> HashMap<String, Value> {
1175        pairs
1176            .iter()
1177            .map(|(k, v)| (k.to_string(), v.clone()))
1178            .collect()
1179    }
1180
1181    /// H7: the dedup cache key must distinguish rows by *value equality*, not by
1182    /// a lossy `u64` hash. Distinct param sets get distinct cache entries;
1183    /// identical params (regardless of insertion order) collapse to one.
1184    #[test]
1185    fn test_canonical_params_key_distinguishes_by_value() {
1186        let a = canonical_params_key(&params(&[("x", Value::Int(1))]));
1187        let b = canonical_params_key(&params(&[("x", Value::Int(2))]));
1188        let c = canonical_params_key(&params(&[("x", Value::Int(1))]));
1189        assert_ne!(a, b, "different values must yield different keys");
1190        assert_eq!(a, c, "equal values must yield equal keys");
1191
1192        // Order-independence: same content, different insertion order → same key.
1193        let m1 = params(&[("a", Value::Int(1)), ("b", Value::String("z".into()))]);
1194        let m2 = params(&[("b", Value::String("z".into())), ("a", Value::Int(1))]);
1195        assert_eq!(canonical_params_key(&m1), canonical_params_key(&m2));
1196
1197        // Used as a real cache key: two distinct rows never alias each other.
1198        let mut cache: HashMap<BTreeMap<String, Value>, &str> = HashMap::new();
1199        cache.insert(a.clone(), "row-1");
1200        cache.insert(b.clone(), "row-2");
1201        assert_eq!(cache.get(&a), Some(&"row-1"));
1202        assert_eq!(cache.get(&b), Some(&"row-2"));
1203        assert_eq!(cache.len(), 2);
1204    }
1205}