Skip to main content

uni_query/query/df_graph/
apply.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Apply (correlated subquery) execution plan for DataFusion.
5//!
6//! Implements `CALL { ... }` subqueries by executing the subquery once per
7//! input row, injecting the input row's columns as parameters, and cross-joining
8//! the results.
9//!
10//! # Semantics
11//!
12//! For each row from the input plan:
13//! 1. Optionally filter via `input_filter`
14//! 2. Inject the input row's columns as parameters
15//! 3. Re-plan and execute the subquery with those parameters
16//! 4. Cross-join: merge each subquery result row with the input row
17//!
18//! If input produces zero rows (after filtering), execute the subquery once
19//! with the base parameters (standalone CALL support).
20
21use crate::query::df_graph::common::{
22    arrow_err, collect_all_partitions, compute_plan_properties, execute_subplan, extract_row_params,
23};
24use crate::query::df_graph::{GraphExecutionContext, MutationContext};
25use crate::query::planner::LogicalPlan;
26use arrow_array::builder::{
27    BooleanBuilder, Float64Builder, Int32Builder, Int64Builder, StringBuilder, UInt64Builder,
28};
29use arrow_array::{ArrayRef, RecordBatch};
30use arrow_schema::{DataType, SchemaRef};
31use datafusion::common::Result as DFResult;
32use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
35use datafusion::prelude::SessionContext;
36use futures::Stream;
37use parking_lot::RwLock;
38use std::any::Any;
39use std::collections::HashMap;
40use std::collections::hash_map::DefaultHasher;
41use std::fmt;
42use std::hash::{Hash, Hasher};
43use std::pin::Pin;
44use std::sync::Arc;
45use std::task::{Context, Poll};
46use uni_common::Value;
47use uni_common::core::schema::Schema as UniSchema;
48use uni_cypher::ast::{Expr, UnaryOp};
49use uni_store::storage::manager::StorageManager;
50
51/// Apply (correlated subquery) execution plan.
52///
53/// The input is pre-planned as a physical plan (executed directly).
54/// The subquery is stored as a **logical** plan and re-planned per row at runtime
55/// with correlated parameters injected.
56/// Handles both `SubqueryCall` (no input_filter) and `Apply` (with input_filter).
57pub struct GraphApplyExec {
58    /// Physical plan for the driving input (e.g., MATCH scan).
59    /// Pre-planned at construction time to preserve property context.
60    input_exec: Arc<dyn ExecutionPlan>,
61
62    /// Logical plan for the correlated subquery (re-planned per row).
63    subquery_plan: LogicalPlan,
64
65    /// Optional pre-filter applied to input rows before subquery execution.
66    input_filter: Option<Expr>,
67
68    /// Graph execution context shared with sub-planners.
69    graph_ctx: Arc<GraphExecutionContext>,
70
71    /// DataFusion session context.
72    session_ctx: Arc<RwLock<SessionContext>>,
73
74    /// Storage manager for creating sub-planners.
75    storage: Arc<StorageManager>,
76
77    /// Schema for label/edge type lookups.
78    schema_info: Arc<UniSchema>,
79
80    /// Query parameters.
81    params: HashMap<String, Value>,
82
83    /// Output schema (merged: surviving input columns + subquery columns).
84    /// Subquery fields override input fields of the same name.
85    output_schema: SchemaRef,
86
87    /// Indices into `input_exec.schema()` for input columns that survive the
88    /// schema merge (i.e., their name is NOT also in the subquery's output).
89    /// Pre-computed at construction so the per-row hot path avoids re-deriving
90    /// the filter. The leading `kept_input_indices.len()` columns of
91    /// `output_schema` correspond 1:1 to these input indices.
92    kept_input_indices: Arc<[usize]>,
93
94    /// Parallel to `kept_input_indices`: when `Some((var, prop))`, the kept
95    /// input column `var.prop` must be refreshed from `sub_row[var]`'s Map
96    /// instead of sliced from the input batch. This carries SET-mutated
97    /// dotted columns from the subquery's post-SET Map across the Apply
98    /// boundary so the outer plan's `RETURN v.prop` sees the updated value.
99    kept_input_overrides: Arc<[Option<(String, String)>]>,
100
101    /// Cached plan properties.
102    properties: Arc<PlanProperties>,
103
104    /// Outer mutation context, threaded into the per-row sub-planner so that
105    /// `CALL { ... SET/CREATE/MERGE/DELETE ... }` writes route through the
106    /// same transaction's L0 buffer. `None` for read-only outer plans.
107    mutation_ctx: Option<Arc<MutationContext>>,
108
109    /// Execution metrics.
110    metrics: ExecutionPlanMetricsSet,
111}
112
113impl fmt::Debug for GraphApplyExec {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        f.debug_struct("GraphApplyExec")
116            .field("has_input_filter", &self.input_filter.is_some())
117            .finish()
118    }
119}
120
121impl GraphApplyExec {
122    /// Create a new Apply execution plan.
123    #[expect(clippy::too_many_arguments)]
124    pub fn new(
125        input_exec: Arc<dyn ExecutionPlan>,
126        subquery_plan: LogicalPlan,
127        input_filter: Option<Expr>,
128        graph_ctx: Arc<GraphExecutionContext>,
129        session_ctx: Arc<RwLock<SessionContext>>,
130        storage: Arc<StorageManager>,
131        schema_info: Arc<UniSchema>,
132        params: HashMap<String, Value>,
133        output_schema: SchemaRef,
134        kept_input_indices: Vec<usize>,
135        kept_input_overrides: Vec<Option<(String, String)>>,
136        mutation_ctx: Option<Arc<MutationContext>>,
137    ) -> Self {
138        let properties = compute_plan_properties(output_schema.clone());
139
140        Self {
141            input_exec,
142            subquery_plan,
143            input_filter,
144            graph_ctx,
145            session_ctx,
146            storage,
147            schema_info,
148            params,
149            output_schema,
150            kept_input_indices: kept_input_indices.into(),
151            kept_input_overrides: kept_input_overrides.into(),
152            properties,
153            mutation_ctx,
154            metrics: ExecutionPlanMetricsSet::new(),
155        }
156    }
157}
158
159impl DisplayAs for GraphApplyExec {
160    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        write!(
162            f,
163            "GraphApplyExec: filter={}",
164            if self.input_filter.is_some() {
165                "yes"
166            } else {
167                "none"
168            }
169        )
170    }
171}
172
173impl ExecutionPlan for GraphApplyExec {
174    fn name(&self) -> &str {
175        "GraphApplyExec"
176    }
177
178    fn as_any(&self) -> &dyn Any {
179        self
180    }
181
182    fn schema(&self) -> SchemaRef {
183        self.output_schema.clone()
184    }
185
186    fn properties(&self) -> &Arc<PlanProperties> {
187        &self.properties
188    }
189
190    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
191        // No physical children — sub-plans are re-planned at execution time
192        vec![]
193    }
194
195    fn with_new_children(
196        self: Arc<Self>,
197        children: Vec<Arc<dyn ExecutionPlan>>,
198    ) -> DFResult<Arc<dyn ExecutionPlan>> {
199        if !children.is_empty() {
200            return Err(datafusion::error::DataFusionError::Plan(
201                "GraphApplyExec has no children".to_string(),
202            ));
203        }
204        Ok(self)
205    }
206
207    fn execute(
208        &self,
209        partition: usize,
210        _context: Arc<TaskContext>,
211    ) -> DFResult<SendableRecordBatchStream> {
212        let metrics = BaselineMetrics::new(&self.metrics, partition);
213
214        let input_exec = self.input_exec.clone();
215        let subquery_plan = self.subquery_plan.clone();
216        let input_filter = self.input_filter.clone();
217        let graph_ctx = self.graph_ctx.clone();
218        let session_ctx = self.session_ctx.clone();
219        let storage = self.storage.clone();
220        let schema_info = self.schema_info.clone();
221        let params = self.params.clone();
222        let output_schema = self.output_schema.clone();
223        let kept_input_indices = self.kept_input_indices.clone();
224        let kept_input_overrides = self.kept_input_overrides.clone();
225        let mutation_ctx = self.mutation_ctx.clone();
226
227        let fut = async move {
228            run_apply(
229                input_exec,
230                &subquery_plan,
231                input_filter.as_ref(),
232                &graph_ctx,
233                &session_ctx,
234                &storage,
235                &schema_info,
236                &params,
237                &output_schema,
238                &kept_input_indices,
239                &kept_input_overrides,
240                mutation_ctx.as_ref(),
241            )
242            .await
243        };
244
245        Ok(Box::pin(ApplyStream {
246            state: ApplyStreamState::Running(Box::pin(fut)),
247            schema: self.output_schema.clone(),
248            metrics,
249        }))
250    }
251
252    fn metrics(&self) -> Option<MetricsSet> {
253        Some(self.metrics.clone_inner())
254    }
255}
256
257// ---------------------------------------------------------------------------
258// Core apply logic
259// ---------------------------------------------------------------------------
260
261/// Convert record batches into row-oriented `HashMap<String, Value>` representation.
262fn batches_to_row_maps(batches: &[RecordBatch]) -> Vec<HashMap<String, Value>> {
263    batches
264        .iter()
265        .flat_map(|batch| {
266            (0..batch.num_rows()).map(move |row_idx| extract_row_params(batch, row_idx))
267        })
268        .collect()
269}
270
271/// Evaluate a Cypher filter expression against a row.
272///
273/// Supports simple binary comparisons and boolean operations needed for
274/// input_filter pushdown (e.g., `p.age > 30`, `p.status = 'active'`).
275fn evaluate_filter(filter: &Expr, row: &HashMap<String, Value>) -> bool {
276    match filter {
277        Expr::BinaryOp { left, op, right } => {
278            use uni_cypher::ast::BinaryOp;
279            match op {
280                BinaryOp::And => evaluate_filter(left, row) && evaluate_filter(right, row),
281                BinaryOp::Or => evaluate_filter(left, row) || evaluate_filter(right, row),
282                _ => {
283                    let left_val = resolve_expr_value(left, row);
284                    let right_val = resolve_expr_value(right, row);
285                    evaluate_comparison(op, &left_val, &right_val)
286                }
287            }
288        }
289        Expr::UnaryOp {
290            op: UnaryOp::Not,
291            expr,
292        } => !evaluate_filter(expr, row),
293        _ => {
294            // Treat any other expression as a truth test on its resolved value
295            let val = resolve_expr_value(filter, row);
296            val.as_bool().unwrap_or(false)
297        }
298    }
299}
300
301/// Resolve a simple expression to a Value using the row context.
302fn resolve_expr_value(expr: &Expr, row: &HashMap<String, Value>) -> Value {
303    match expr {
304        Expr::Literal(lit) => lit.to_value(),
305        Expr::Variable(name) => row.get(name).cloned().unwrap_or(Value::Null),
306        Expr::Property(base_expr, key) => {
307            if let Expr::Variable(var) = base_expr.as_ref() {
308                // Look up "var.key" in the row map
309                let col_name = format!("{}.{}", var, key);
310                row.get(&col_name).cloned().unwrap_or(Value::Null)
311            } else {
312                Value::Null
313            }
314        }
315        _ => Value::Null,
316    }
317}
318
319/// Compare two Values for ordering.
320fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
321    match (a, b) {
322        (Value::Int(a), Value::Int(b)) => Some(a.cmp(b)),
323        (Value::Float(a), Value::Float(b)) => a.partial_cmp(b),
324        (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
325        (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)),
326        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
327        _ => None,
328    }
329}
330
331/// Evaluate a binary comparison operator on two Values.
332///
333/// Handles equality (`Eq`, `NotEq`) directly and delegates ordering
334/// comparisons (`Lt`, `LtEq`, `Gt`, `GtEq`) to [`compare_values`].
335fn evaluate_comparison(op: &uni_cypher::ast::BinaryOp, left: &Value, right: &Value) -> bool {
336    use std::cmp::Ordering;
337    use uni_cypher::ast::BinaryOp;
338
339    match op {
340        BinaryOp::Eq => left == right,
341        BinaryOp::NotEq => left != right,
342        BinaryOp::Lt => compare_values(left, right) == Some(Ordering::Less),
343        BinaryOp::LtEq => matches!(
344            compare_values(left, right),
345            Some(Ordering::Less | Ordering::Equal)
346        ),
347        BinaryOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
348        BinaryOp::GtEq => matches!(
349            compare_values(left, right),
350            Some(Ordering::Greater | Ordering::Equal)
351        ),
352        _ => false,
353    }
354}
355
356/// Build a typed column from row maps using a builder and value extractor.
357///
358/// For each row, looks up `col_name`, applies `extract` to get an `Option<T>`,
359/// and appends the value or null to the builder.
360fn build_column<B, T>(
361    rows: &[HashMap<String, Value>],
362    col_name: &str,
363    mut builder: B,
364    extract: impl Fn(&Value) -> Option<T>,
365) -> ArrayRef
366where
367    B: arrow_array::builder::ArrayBuilder,
368    B: PrimitiveAppend<T>,
369{
370    for row in rows {
371        match row.get(col_name).and_then(&extract) {
372            Some(v) => builder.append_typed_value(v),
373            None => builder.append_typed_null(),
374        }
375    }
376    Arc::new(builder.finish_to_array())
377}
378
379/// Trait to abstract over typed append for primitive Arrow builders.
380///
381/// This avoids repeating the same get-value/convert/append-or-null pattern
382/// for each numeric/boolean type in `rows_to_batch`.
383trait PrimitiveAppend<T> {
384    fn append_typed_value(&mut self, val: T);
385    fn append_typed_null(&mut self);
386    fn finish_to_array(self) -> ArrayRef;
387}
388
389macro_rules! impl_primitive_append {
390    ($builder:ty, $native:ty, $array:ty) => {
391        impl PrimitiveAppend<$native> for $builder {
392            fn append_typed_value(&mut self, val: $native) {
393                self.append_value(val);
394            }
395            fn append_typed_null(&mut self) {
396                self.append_null();
397            }
398            fn finish_to_array(mut self) -> ArrayRef {
399                Arc::new(self.finish()) as ArrayRef
400            }
401        }
402    };
403}
404
405impl_primitive_append!(UInt64Builder, u64, arrow_array::UInt64Array);
406impl_primitive_append!(Int64Builder, i64, arrow_array::Int64Array);
407impl_primitive_append!(Int32Builder, i32, arrow_array::Int32Array);
408impl_primitive_append!(Float64Builder, f64, arrow_array::Float64Array);
409impl_primitive_append!(BooleanBuilder, bool, arrow_array::BooleanArray);
410
411/// Build a RecordBatch from merged row maps using the output schema.
412fn rows_to_batch(rows: &[HashMap<String, Value>], schema: &SchemaRef) -> DFResult<RecordBatch> {
413    if rows.is_empty() {
414        return Ok(RecordBatch::new_empty(schema.clone()));
415    }
416
417    let num_rows = rows.len();
418    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
419
420    for field in schema.fields() {
421        let col_name = field.name();
422        let col = match field.data_type() {
423            DataType::UInt64 => build_column(
424                rows,
425                col_name,
426                UInt64Builder::with_capacity(num_rows),
427                |v| v.as_u64().or_else(|| v.as_i64().map(|i| i as u64)),
428            ),
429            DataType::Int64 => build_column(
430                rows,
431                col_name,
432                Int64Builder::with_capacity(num_rows),
433                Value::as_i64,
434            ),
435            DataType::Int32 => {
436                build_column(rows, col_name, Int32Builder::with_capacity(num_rows), |v| {
437                    v.as_i64().map(|i| i as i32)
438                })
439            }
440            DataType::Float64 => build_column(
441                rows,
442                col_name,
443                Float64Builder::with_capacity(num_rows),
444                Value::as_f64,
445            ),
446            DataType::Boolean => build_column(
447                rows,
448                col_name,
449                BooleanBuilder::with_capacity(num_rows),
450                Value::as_bool,
451            ),
452            DataType::LargeBinary => {
453                let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(
454                    num_rows,
455                    num_rows * 64,
456                );
457                for row in rows {
458                    match row.get(col_name) {
459                        Some(val) if !val.is_null() => {
460                            let cv_bytes = uni_common::cypher_value_codec::encode(val);
461                            builder.append_value(&cv_bytes);
462                        }
463                        _ => builder.append_null(),
464                    }
465                }
466                Arc::new(builder.finish()) as ArrayRef
467            }
468            DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
469                let mut builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
470                for row in rows {
471                    match row.get(col_name) {
472                        Some(Value::List(items)) => {
473                            for item in items {
474                                match item {
475                                    Value::String(s) => builder.values().append_value(s),
476                                    Value::Null => builder.values().append_null(),
477                                    other => builder.values().append_value(format!("{other}")),
478                                }
479                            }
480                            builder.append(true);
481                        }
482                        _ => builder.append_null(),
483                    }
484                }
485                Arc::new(builder.finish()) as ArrayRef
486            }
487            DataType::Null => Arc::new(arrow_array::NullArray::new(num_rows)) as ArrayRef,
488            // Default: Utf8 for everything else
489            _ => {
490                let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
491                for row in rows {
492                    match row.get(col_name) {
493                        Some(Value::Null) | None => builder.append_null(),
494                        Some(Value::String(s)) => builder.append_value(s),
495                        Some(other) => builder.append_value(format!("{other}")),
496                    }
497                }
498                Arc::new(builder.finish()) as ArrayRef
499            }
500        };
501        columns.push(col);
502    }
503
504    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
505}
506
507/// Slice a single row, projecting only the input columns whose name survives
508/// the Apply schema merge (i.e., not overridden by a subquery RETURN column).
509fn slice_kept_row(batch: &RecordBatch, row_idx: usize, kept: &[usize]) -> Vec<ArrayRef> {
510    kept.iter()
511        .map(|&i| batch.column(i).slice(row_idx, 1))
512        .collect()
513}
514
515/// Check if a logical plan is or contains a ProcedureCall node.
516/// This helps distinguish procedure calls (CALL...YIELD) from regular subqueries (CALL { ... }).
517fn is_procedure_call(plan: &LogicalPlan) -> bool {
518    match plan {
519        LogicalPlan::ProcedureCall { .. } => true,
520        LogicalPlan::Project { input, .. }
521        | LogicalPlan::Filter { input, .. }
522        | LogicalPlan::Sort { input, .. }
523        | LogicalPlan::Limit { input, .. }
524        | LogicalPlan::Distinct { input } => is_procedure_call(input),
525        _ => false,
526    }
527}
528
529/// Recursively check whether a logical plan contains any write operation.
530///
531/// Subqueries that mutate state must execute once per correlated input row;
532/// the IN-list batching optimization is safe only for read-only subqueries.
533fn plan_contains_writes(plan: &LogicalPlan) -> bool {
534    use crate::query::planner::LogicalPlan as LP;
535    match plan {
536        LP::Create { .. }
537        | LP::CreateBatch { .. }
538        | LP::Merge { .. }
539        | LP::Delete { .. }
540        | LP::Set { .. }
541        | LP::Remove { .. }
542        | LP::Foreach { .. } => true,
543        LP::Project { input, .. }
544        | LP::Filter { input, .. }
545        | LP::Sort { input, .. }
546        | LP::Limit { input, .. }
547        | LP::Distinct { input }
548        | LP::Unwind { input, .. }
549        | LP::Aggregate { input, .. } => plan_contains_writes(input),
550        LP::Apply {
551            input, subquery, ..
552        }
553        | LP::SubqueryCall { input, subquery } => {
554            plan_contains_writes(input) || plan_contains_writes(subquery)
555        }
556        _ => false,
557    }
558}
559
560/// Compute a hash for row parameters to enable deduplication.
561///
562/// Sorts entries by key for deterministic hashing regardless of iteration order.
563fn hash_row_params(params: &HashMap<String, Value>) -> u64 {
564    let mut hasher = DefaultHasher::new();
565    let mut entries: Vec<_> = params.iter().collect();
566    entries.sort_unstable_by_key(|(k, _)| *k);
567    for (key, val) in entries {
568        key.hash(&mut hasher);
569        format!("{val:?}").hash(&mut hasher);
570    }
571    hasher.finish()
572}
573
574/// Check if batching is eligible for this apply operation.
575/// Returns true if:
576/// - There are 2+ filtered entries (single row → existing path)
577/// - At least one `._vid` correlation key exists
578fn is_batch_eligible(filtered_entries: &[(&RecordBatch, usize, HashMap<String, Value>)]) -> bool {
579    if filtered_entries.len() < 2 {
580        return false;
581    }
582
583    // Check if at least one correlation key (._vid) exists
584    filtered_entries
585        .iter()
586        .any(|(_, _, row_params)| row_params.keys().any(|k| k.ends_with("._vid")))
587}
588
589/// Run the apply operation: execute input, filter, correlate subquery, merge results.
590///
591/// Uses Arrow-native row slicing for input columns to preserve complex types
592/// (Struct, List, etc.), and only converts to Value for parameter injection.
593#[expect(clippy::too_many_arguments)]
594async fn run_apply(
595    input_exec: Arc<dyn ExecutionPlan>,
596    subquery_plan: &LogicalPlan,
597    input_filter: Option<&Expr>,
598    graph_ctx: &Arc<GraphExecutionContext>,
599    session_ctx: &Arc<RwLock<SessionContext>>,
600    storage: &Arc<StorageManager>,
601    schema_info: &Arc<UniSchema>,
602    params: &HashMap<String, Value>,
603    output_schema: &SchemaRef,
604    kept_input_indices: &[usize],
605    kept_input_overrides: &[Option<(String, String)>],
606    mutation_ctx: Option<&Arc<MutationContext>>,
607) -> DFResult<RecordBatch> {
608    let apply_start = std::time::Instant::now();
609    let is_proc_call = is_procedure_call(subquery_plan);
610    tracing::debug!("run_apply: is_procedure_call={}", is_proc_call);
611
612    // 1. Execute pre-planned input physical plan directly
613    let task_ctx = session_ctx.read().task_ctx();
614    let input_batches = collect_all_partitions(&input_exec, task_ctx).await?;
615
616    // 2. Collect (batch_ref, row_idx) for rows that pass the input filter,
617    //    along with their Value-based params for subquery injection.
618    let mut filtered_entries: Vec<(&RecordBatch, usize, HashMap<String, Value>)> = Vec::new();
619    for batch in &input_batches {
620        for row_idx in 0..batch.num_rows() {
621            let row_params = extract_row_params(batch, row_idx);
622            if let Some(filter) = input_filter
623                && !evaluate_filter(filter, &row_params)
624            {
625                continue;
626            }
627            filtered_entries.push((batch, row_idx, row_params));
628        }
629    }
630
631    tracing::debug!(
632        "run_apply: filtered_entries count = {}",
633        filtered_entries.len()
634    );
635
636    let subquery_has_writes = plan_contains_writes(subquery_plan);
637
638    // 3. Handle empty input: execute subquery once with base params.
639    //
640    // For unit subqueries (no RETURN, schema has no subquery fields) we skip
641    // the call entirely: with zero outer rows there's nothing to drive
642    // per-row side effects, and correlated parameter resolution would fail
643    // (`Unresolved parameter: $n`). The same logic applies to any
644    // write-bearing subquery — running it once with no outer correlation
645    // would either fail to resolve params or write phantom rows.
646    let is_unit_subquery = output_schema.fields().len() == kept_input_indices.len();
647    if filtered_entries.is_empty() {
648        if is_unit_subquery || subquery_has_writes {
649            return Ok(RecordBatch::new_empty(output_schema.clone()));
650        }
651        let sub_batches = execute_subplan(
652            subquery_plan,
653            params,
654            &HashMap::new(), // No outer values for empty input case
655            graph_ctx,
656            session_ctx,
657            storage,
658            schema_info,
659            mutation_ctx,
660        )
661        .await?;
662        let sub_rows = batches_to_row_maps(&sub_batches);
663        return rows_to_batch(&sub_rows, output_schema);
664    }
665
666    // 4. Check if we can batch the subplan execution
667    // IMPORTANT: Only batch when NOT a procedure call AND has input_filter.
668    // - Procedure calls use outer_values (not params), incompatible with batching
669    // - No input_filter indicates CALL subquery (e.g., MATCH (p) CALL { MATCH (p) })
670    //   which requires per-row correlation, not batching
671    // - Target pattern: procedure call → Apply with filter → MATCH traversal
672    let has_filter = input_filter.is_some();
673
674    if is_batch_eligible(&filtered_entries) && !is_proc_call && has_filter && !subquery_has_writes {
675        tracing::debug!("run_apply: batching eligible, attempting batch execution");
676
677        // Collect unique VID values and build batched params
678        let mut vid_values: HashMap<String, Vec<Value>> = HashMap::new();
679        for (_, _, row_params) in &filtered_entries {
680            for (key, value) in row_params {
681                if key.ends_with("._vid") {
682                    vid_values
683                        .entry(key.clone())
684                        .or_default()
685                        .push(value.clone());
686                }
687            }
688        }
689
690        // Build batched params: VID keys become Value::List
691        let mut batched_params = params.clone();
692        for (key, values) in &vid_values {
693            batched_params.insert(key.clone(), Value::List(values.clone()));
694        }
695
696        // Add carry-through parameters from first row (for literals in projections)
697        // These won't affect the WHERE filter but ensure planning succeeds
698        if let Some((_, _, first_row_params)) = filtered_entries.first() {
699            for (key, value) in first_row_params {
700                if !key.ends_with("._vid") {
701                    batched_params
702                        .entry(key.clone())
703                        .or_insert_with(|| value.clone());
704                }
705            }
706        }
707
708        // Execute subquery ONCE with batched VID params
709        let subplan_start = std::time::Instant::now();
710        let sub_batches = execute_subplan(
711            subquery_plan,
712            &batched_params,
713            &HashMap::new(),
714            graph_ctx,
715            session_ctx,
716            storage,
717            schema_info,
718            mutation_ctx,
719        )
720        .await?;
721        let subplan_elapsed = subplan_start.elapsed();
722        tracing::debug!(
723            "run_apply: batch execute_subplan took {:?}",
724            subplan_elapsed
725        );
726
727        // Build hash index: VID → Vec<subquery result rows>
728        let sub_rows = batches_to_row_maps(&sub_batches);
729        let mut sub_index: HashMap<i64, Vec<&HashMap<String, Value>>> = HashMap::new();
730
731        // Find the VID key (should be the same for all rows)
732        let vid_key = vid_values.keys().next().expect("at least one VID key");
733
734        for sub_row in &sub_rows {
735            if let Some(Value::Int(vid)) = sub_row.get(vid_key) {
736                sub_index.entry(*vid).or_default().push(sub_row);
737            }
738        }
739
740        // Hash-join: for each input row, look up by VID, emit input+subquery columns.
741        // `kept_input_indices` filters out input columns whose names are
742        // overridden by subquery RETURN columns.
743        let num_input_cols = kept_input_indices.len();
744        let num_output_cols = output_schema.fields().len();
745        let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
746
747        for (batch, row_idx, row_params) in &filtered_entries {
748            // Extract VID from row params
749            let input_vid = if let Some(Value::Int(vid)) = row_params.get(vid_key) {
750                *vid
751            } else {
752                continue; // Skip if VID is not present
753            };
754
755            let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
756
757            // Look up matching subquery rows by VID
758            if let Some(matching_sub_rows) = sub_index.get(&input_vid) {
759                for sub_row in matching_sub_rows {
760                    append_cross_join_row(
761                        &mut column_arrays,
762                        &input_row_arrays,
763                        sub_row,
764                        output_schema,
765                        num_input_cols,
766                        kept_input_overrides,
767                        is_unit_subquery,
768                    )?;
769                }
770            } else if is_unit_subquery {
771                // Unit subquery: side effects (writes) have run as part of the
772                // bulk sub-plan execution above; pass the input row through.
773                for (col_idx, arr) in input_row_arrays.iter().enumerate() {
774                    column_arrays[col_idx].push(arr.clone());
775                }
776            }
777            // else: inner join — skip input row (no subquery matches)
778        }
779
780        let result = concat_column_arrays(&column_arrays, output_schema);
781
782        let apply_elapsed = apply_start.elapsed();
783        tracing::debug!(
784            "run_apply: completed (batched) in {:?}, 1 subplan execution",
785            apply_elapsed
786        );
787
788        return result;
789    }
790
791    // 5. Fallback: For each input row, execute subquery and collect output column arrays.
792    //    Used when batching is not eligible (single row, no VID keys, or procedure call).
793    //    Each output row is: surviving input columns (sliced via
794    //    `kept_input_indices`) + subquery columns. Input columns whose name is
795    //    overridden by a subquery RETURN are dropped here so the merged
796    //    `output_schema` matches the data layout.
797    let num_input_cols = kept_input_indices.len();
798    let num_output_cols = output_schema.fields().len();
799    // Accumulate per-column arrays for all output rows
800    let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
801
802    let mut total_subplan_time = std::time::Duration::ZERO;
803    let mut subplan_executions = 0;
804
805    // Cache to deduplicate subplan executions for identical row parameters
806    let mut subplan_cache: HashMap<u64, Vec<HashMap<String, Value>>> = HashMap::new();
807    let mut cache_hits = 0;
808
809    for (batch, row_idx, row_params) in &filtered_entries {
810        // For procedure calls (CALL...YIELD), pass row_params as outer_values to avoid
811        // shadowing user parameters. For regular subqueries (CALL { ... }), merge them
812        // into parameters for backward compatibility with correlated variables.
813        let (sub_params, sub_outer_values) = if is_procedure_call(subquery_plan) {
814            // Procedure call: keep params separate from outer values
815            (params.clone(), row_params.clone())
816        } else {
817            // Regular subquery: merge outer values into params (old behavior)
818            let mut merged = params.clone();
819            merged.extend(row_params.clone());
820            (merged, HashMap::new())
821        };
822
823        // Check cache for identical row params
824        let params_hash = hash_row_params(row_params);
825        let sub_rows = if let Some(cached_rows) = subplan_cache.get(&params_hash) {
826            // Cache hit: reuse previous results
827            cache_hits += 1;
828            tracing::debug!(
829                "run_apply: cache hit for params hash {}, skipping execute_subplan",
830                params_hash
831            );
832            cached_rows.clone()
833        } else {
834            // Cache miss: execute subplan
835            let subplan_start = std::time::Instant::now();
836            let sub_batches = execute_subplan(
837                subquery_plan,
838                &sub_params,
839                &sub_outer_values,
840                graph_ctx,
841                session_ctx,
842                storage,
843                schema_info,
844                mutation_ctx,
845            )
846            .await?;
847            let subplan_elapsed = subplan_start.elapsed();
848            total_subplan_time += subplan_elapsed;
849            subplan_executions += 1;
850
851            tracing::debug!(
852                "run_apply: execute_subplan #{} took {:?}",
853                subplan_executions,
854                subplan_elapsed
855            );
856
857            let rows = batches_to_row_maps(&sub_batches);
858            subplan_cache.insert(params_hash, rows.clone());
859            rows
860        };
861
862        let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
863
864        if sub_rows.is_empty() {
865            if is_unit_subquery {
866                // Unit subquery: side effects have executed; pass the input
867                // row through (no subquery columns to append).
868                for (col_idx, arr) in input_row_arrays.iter().enumerate() {
869                    column_arrays[col_idx].push(arr.clone());
870                }
871            }
872            // else: inner-join semantics — skip this input row.
873            continue;
874        }
875
876        for sub_row in &sub_rows {
877            append_cross_join_row(
878                &mut column_arrays,
879                &input_row_arrays,
880                sub_row,
881                output_schema,
882                num_input_cols,
883                kept_input_overrides,
884                is_unit_subquery,
885            )?;
886        }
887    }
888
889    // 5. Concatenate all accumulated arrays per column
890    let result = concat_column_arrays(&column_arrays, output_schema);
891
892    let apply_elapsed = apply_start.elapsed();
893    tracing::debug!(
894        "run_apply: completed in {:?}, {} subplan executions, {} cache hits, {:?} total subplan time",
895        apply_elapsed,
896        subplan_executions,
897        cache_hits,
898        total_subplan_time
899    );
900
901    result
902}
903
904/// Build a single-row Arrow array from a builder and optional value.
905fn single_row_array<B, T>(mut builder: B, val: Option<T>) -> ArrayRef
906where
907    B: PrimitiveAppend<T>,
908{
909    match val {
910        Some(v) => builder.append_typed_value(v),
911        None => builder.append_typed_null(),
912    }
913    builder.finish_to_array()
914}
915
916/// Convert a single Value to a single-row Arrow array of the given type.
917fn value_to_single_row_array(val: &Value, data_type: &DataType) -> DFResult<ArrayRef> {
918    Ok(match data_type {
919        DataType::UInt64 => single_row_array(
920            UInt64Builder::with_capacity(1),
921            val.as_u64().or_else(|| val.as_i64().map(|v| v as u64)),
922        ),
923        DataType::Int64 => single_row_array(Int64Builder::with_capacity(1), val.as_i64()),
924        DataType::Int32 => single_row_array(
925            Int32Builder::with_capacity(1),
926            val.as_i64().map(|v| v as i32),
927        ),
928        DataType::Float64 => single_row_array(Float64Builder::with_capacity(1), val.as_f64()),
929        DataType::Boolean => single_row_array(BooleanBuilder::with_capacity(1), val.as_bool()),
930        DataType::Null => Arc::new(arrow_array::NullArray::new(1)) as ArrayRef,
931        DataType::LargeBinary => {
932            let mut b = arrow_array::builder::LargeBinaryBuilder::with_capacity(1, 64);
933            if val.is_null() {
934                b.append_null();
935            } else {
936                let cv_bytes = uni_common::cypher_value_codec::encode(val);
937                b.append_value(&cv_bytes);
938            }
939            Arc::new(b.finish()) as ArrayRef
940        }
941        DataType::Utf8 => {
942            let mut b = StringBuilder::with_capacity(1, 64);
943            match val {
944                Value::Null => b.append_null(),
945                Value::String(s) => b.append_value(s),
946                other => b.append_value(format!("{other}")),
947            }
948            Arc::new(b.finish()) as ArrayRef
949        }
950        DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
951            let mut b = arrow_array::builder::ListBuilder::new(StringBuilder::new());
952            match val {
953                Value::List(items) => {
954                    for item in items {
955                        match item {
956                            Value::String(s) => b.values().append_value(s),
957                            Value::Null => b.values().append_null(),
958                            other => b.values().append_value(format!("{other}")),
959                        }
960                    }
961                    b.append(true);
962                }
963                Value::Null => b.append_null(),
964                other => {
965                    b.values().append_value(format!("{other}"));
966                    b.append(true);
967                }
968            }
969            Arc::new(b.finish()) as ArrayRef
970        }
971        DataType::Struct(fields) => {
972            // Encode a graph entity (`Value::Map` / `Value::Node` / `Value::Edge`)
973            // into a single-row StructArray matching the declared field
974            // layout. Used by the unit-subquery refresh path so the bare
975            // entity column reflects post-SET state — `compile_property_access`
976            // (expr_compiler.rs) tries struct-field extraction before flat
977            // columns, so a stale Struct would shadow our refreshed dotted
978            // columns.
979            let map_view: Option<&HashMap<String, Value>> = match val {
980                Value::Map(m) => Some(m),
981                Value::Node(n) => Some(&n.properties),
982                Value::Edge(e) => Some(&e.properties),
983                _ => None,
984            };
985            let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(fields.len());
986            for child_field in fields.iter() {
987                let child_val = map_view
988                    .and_then(|m| m.get(child_field.name()))
989                    .cloned()
990                    .unwrap_or(Value::Null);
991                child_arrays.push(value_to_single_row_array(
992                    &child_val,
993                    child_field.data_type(),
994                )?);
995            }
996            let pairs: Vec<(Arc<arrow_schema::Field>, ArrayRef)> =
997                fields.iter().cloned().zip(child_arrays).collect();
998            Arc::new(arrow_array::StructArray::from(pairs)) as ArrayRef
999        }
1000        _ => {
1001            debug_assert!(
1002                false,
1003                "value_to_single_row_array: unhandled DataType {:?} — mirror the arm in rows_to_batch",
1004                data_type
1005            );
1006            let mut b = StringBuilder::with_capacity(1, 64);
1007            match val {
1008                Value::Null => b.append_null(),
1009                Value::String(s) => b.append_value(s),
1010                other => b.append_value(format!("{other}")),
1011            }
1012            Arc::new(b.finish()) as ArrayRef
1013        }
1014    })
1015}
1016
1017/// Append one cross-joined row (input + subquery) to the per-column accumulator.
1018///
1019/// Input columns use Arrow-native sliced arrays to preserve complex types,
1020/// EXCEPT:
1021///   * `kept_input_overrides[i] = Some((var, prop))` — refresh `var.prop`
1022///     from the subquery's post-SET bare `var` Map in `sub_row` so dotted
1023///     columns surface fresh values across the Apply boundary.
1024///   * When `is_unit_subquery` is true, ALSO refresh any kept input column
1025///     whose name appears as a key in `sub_row`. Unit subqueries (no
1026///     RETURN, write-only side effects) re-emit the modified outer row
1027///     under the SAME column names; using the sub_row value gives outer
1028///     `RETURN v.prop` the post-SET binding even though the unit subquery
1029///     contributes no explicit RETURN fields.
1030///
1031/// Subquery columns convert `Value` to single-row Arrow arrays as before.
1032fn append_cross_join_row(
1033    column_arrays: &mut [Vec<ArrayRef>],
1034    input_row_arrays: &[ArrayRef],
1035    sub_row: &HashMap<String, Value>,
1036    output_schema: &SchemaRef,
1037    num_input_cols: usize,
1038    kept_input_overrides: &[Option<(String, String)>],
1039    is_unit_subquery: bool,
1040) -> DFResult<()> {
1041    // Add input columns (Arrow-native), with per-column refresh from sub_row
1042    // when applicable (see fn-doc).
1043    for (col_idx, arr) in input_row_arrays.iter().enumerate() {
1044        if let Some(Some((var, prop))) = kept_input_overrides.get(col_idx) {
1045            let extracted = match sub_row.get(var) {
1046                Some(Value::Map(m)) => m.get(prop).cloned().unwrap_or(Value::Null),
1047                Some(Value::Node(n)) => n.properties.get(prop).cloned().unwrap_or(Value::Null),
1048                Some(Value::Edge(e)) => e.properties.get(prop).cloned().unwrap_or(Value::Null),
1049                _ => Value::Null,
1050            };
1051            let field = &output_schema.fields()[col_idx];
1052            let new_arr = value_to_single_row_array(&extracted, field.data_type())?;
1053            column_arrays[col_idx].push(new_arr);
1054            continue;
1055        }
1056        if is_unit_subquery {
1057            // Refresh the kept input column from the subquery's post-SET
1058            // sub_row. Two cases:
1059            //   * Dotted (`v.prop`): extract `prop` from `sub_row[v]` (the
1060            //     subquery emits the modified bare Map under key `v`, not
1061            //     as dotted columns).
1062            //   * Bare (`v`): replace with `sub_row[v]` itself, encoded
1063            //     into the field's declared type. This covers the Struct
1064            //     case (`value_to_single_row_array` now has a Struct arm)
1065            //     and is required because `compile_property_access` in
1066            //     `expr_compiler.rs` tries Struct-field extraction BEFORE
1067            //     the flat-column fallback — a stale Struct would shadow
1068            //     refreshed dotted columns.
1069            let field = &output_schema.fields()[col_idx];
1070            let refreshed: Option<Value> = if let Some(dot) = field.name().find('.') {
1071                let base = &field.name()[..dot];
1072                let prop = &field.name()[dot + 1..];
1073                match sub_row.get(base) {
1074                    Some(Value::Map(m)) => m.get(prop).cloned(),
1075                    Some(Value::Node(n)) => n.properties.get(prop).cloned(),
1076                    Some(Value::Edge(e)) => e.properties.get(prop).cloned(),
1077                    _ => None,
1078                }
1079            } else {
1080                sub_row.get(field.name()).cloned()
1081            };
1082            if let Some(val) = refreshed {
1083                let new_arr = value_to_single_row_array(&val, field.data_type())?;
1084                column_arrays[col_idx].push(new_arr);
1085                continue;
1086            }
1087        }
1088        column_arrays[col_idx].push(arr.clone());
1089    }
1090
1091    // Add subquery columns using Value -> Arrow conversion
1092    let num_output_cols = output_schema.fields().len();
1093    for (col_arr, field) in column_arrays[num_input_cols..num_output_cols]
1094        .iter_mut()
1095        .zip(output_schema.fields()[num_input_cols..num_output_cols].iter())
1096    {
1097        let col_name = field.name();
1098        let val = sub_row.get(col_name).cloned().unwrap_or(Value::Null);
1099        let arr = value_to_single_row_array(&val, field.data_type())?;
1100        col_arr.push(arr);
1101    }
1102    Ok(())
1103}
1104
1105/// Concatenate per-column array accumulators into a single `RecordBatch`.
1106///
1107/// Returns an empty batch if no rows were accumulated.
1108fn concat_column_arrays(
1109    column_arrays: &[Vec<ArrayRef>],
1110    output_schema: &SchemaRef,
1111) -> DFResult<RecordBatch> {
1112    if column_arrays[0].is_empty() {
1113        return Ok(RecordBatch::new_empty(output_schema.clone()));
1114    }
1115
1116    let mut final_columns: Vec<ArrayRef> = Vec::with_capacity(column_arrays.len());
1117    for arrays in column_arrays {
1118        let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(|a| a.as_ref()).collect();
1119        let concatenated = arrow::compute::concat(&refs).map_err(arrow_err)?;
1120        final_columns.push(concatenated);
1121    }
1122
1123    RecordBatch::try_new(output_schema.clone(), final_columns).map_err(arrow_err)
1124}
1125
1126// ---------------------------------------------------------------------------
1127// Stream implementation
1128// ---------------------------------------------------------------------------
1129
1130/// Stream state for the apply operation.
1131enum ApplyStreamState {
1132    /// The apply computation is running.
1133    Running(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
1134    /// Computation completed.
1135    Done,
1136}
1137
1138/// Stream that runs the apply operation and emits the result.
1139struct ApplyStream {
1140    state: ApplyStreamState,
1141    schema: SchemaRef,
1142    metrics: BaselineMetrics,
1143}
1144
1145impl Stream for ApplyStream {
1146    type Item = DFResult<RecordBatch>;
1147
1148    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1149        let metrics = self.metrics.clone();
1150        let _timer = metrics.elapsed_compute().timer();
1151        match &mut self.state {
1152            ApplyStreamState::Running(fut) => match fut.as_mut().poll(cx) {
1153                Poll::Ready(Ok(batch)) => {
1154                    self.metrics.record_output(batch.num_rows());
1155                    self.state = ApplyStreamState::Done;
1156                    Poll::Ready(Some(Ok(batch)))
1157                }
1158                Poll::Ready(Err(e)) => {
1159                    self.state = ApplyStreamState::Done;
1160                    Poll::Ready(Some(Err(e)))
1161                }
1162                Poll::Pending => Poll::Pending,
1163            },
1164            ApplyStreamState::Done => Poll::Ready(None),
1165        }
1166    }
1167}
1168
1169impl RecordBatchStream for ApplyStream {
1170    fn schema(&self) -> SchemaRef {
1171        self.schema.clone()
1172    }
1173}