Skip to main content

uni_query/query/df_graph/
bind_fixed_path.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Fixed-length path binding execution plan for DataFusion.
5//!
6//! This module provides [`BindFixedPathExec`], a DataFusion [`ExecutionPlan`] that
7//! synthesizes a path struct from existing node and edge columns in the batch.
8//!
9//! Used for patterns like `p = (a)-[r]->(b)` or `p = (a)-[r1]->(b)-[r2]->(c)`
10//! where the traversals are single-hop and the path variable needs to be materialized.
11
12use super::GraphExecutionContext;
13use super::common::{arrow_err, compute_plan_properties, extract_column_value};
14use arrow_array::builder::{
15    LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
16};
17use arrow_array::{ArrayRef, RecordBatch};
18use arrow_schema::SchemaRef;
19use datafusion::common::Result as DFResult;
20use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
21use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
22use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
23use futures::{Stream, StreamExt};
24use std::any::Any;
25use std::fmt;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use uni_common::core::id::{Eid, Vid};
30
31/// Execution plan that binds a fixed-length path from existing node/edge columns.
32///
33/// For patterns like `p = (a)-[r]->(b)` or `p = (a)-[r1]->(b)-[r2]->(c)`,
34/// this creates a Path struct with nodes and relationships from the already-computed
35/// columns in the input batch.
36pub struct BindFixedPathExec {
37    /// Input execution plan.
38    input: Arc<dyn ExecutionPlan>,
39
40    /// Node variable names in path order (e.g., ["a", "b"] or ["a", "b", "c"]).
41    node_variables: Vec<String>,
42
43    /// Edge variable names in path order (e.g., ["r"] or ["r1", "r2"]).
44    edge_variables: Vec<String>,
45
46    /// Path variable name (e.g., "p" in `p = (a)-[r]->(b)`).
47    path_variable: String,
48
49    /// Graph execution context for property/label lookup.
50    graph_ctx: Arc<GraphExecutionContext>,
51
52    /// Output schema.
53    schema: SchemaRef,
54
55    /// Cached plan properties.
56    properties: PlanProperties,
57
58    /// Execution metrics.
59    metrics: ExecutionPlanMetricsSet,
60}
61
62impl fmt::Debug for BindFixedPathExec {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("BindFixedPathExec")
65            .field("node_variables", &self.node_variables)
66            .field("edge_variables", &self.edge_variables)
67            .field("path_variable", &self.path_variable)
68            .finish()
69    }
70}
71
72impl BindFixedPathExec {
73    pub fn new(
74        input: Arc<dyn ExecutionPlan>,
75        node_variables: Vec<String>,
76        edge_variables: Vec<String>,
77        path_variable: String,
78        graph_ctx: Arc<GraphExecutionContext>,
79    ) -> Self {
80        let schema = super::common::extend_schema_with_path(input.schema(), &path_variable);
81        let properties = compute_plan_properties(schema.clone());
82
83        Self {
84            input,
85            node_variables,
86            edge_variables,
87            path_variable,
88            graph_ctx,
89            schema,
90            properties,
91            metrics: ExecutionPlanMetricsSet::new(),
92        }
93    }
94}
95
96impl DisplayAs for BindFixedPathExec {
97    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        write!(
99            f,
100            "BindFixedPathExec: {} = ({}) via [{}]",
101            self.path_variable,
102            self.node_variables.join(", "),
103            self.edge_variables.join(", "),
104        )
105    }
106}
107
108impl ExecutionPlan for BindFixedPathExec {
109    fn name(&self) -> &str {
110        "BindFixedPathExec"
111    }
112
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    fn schema(&self) -> SchemaRef {
118        self.schema.clone()
119    }
120
121    fn properties(&self) -> &PlanProperties {
122        &self.properties
123    }
124
125    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
126        vec![&self.input]
127    }
128
129    fn with_new_children(
130        self: Arc<Self>,
131        children: Vec<Arc<dyn ExecutionPlan>>,
132    ) -> DFResult<Arc<dyn ExecutionPlan>> {
133        if children.len() != 1 {
134            return Err(datafusion::error::DataFusionError::Plan(
135                "BindFixedPathExec requires exactly one child".to_string(),
136            ));
137        }
138
139        Ok(Arc::new(Self::new(
140            children[0].clone(),
141            self.node_variables.clone(),
142            self.edge_variables.clone(),
143            self.path_variable.clone(),
144            self.graph_ctx.clone(),
145        )))
146    }
147
148    fn execute(
149        &self,
150        partition: usize,
151        context: Arc<TaskContext>,
152    ) -> DFResult<SendableRecordBatchStream> {
153        let input_stream = self.input.execute(partition, context)?;
154        let metrics = BaselineMetrics::new(&self.metrics, partition);
155
156        Ok(Box::pin(BindFixedPathStream {
157            input: input_stream,
158            node_variables: self.node_variables.clone(),
159            edge_variables: self.edge_variables.clone(),
160            schema: self.schema.clone(),
161            graph_ctx: self.graph_ctx.clone(),
162            metrics,
163        }))
164    }
165
166    fn metrics(&self) -> Option<MetricsSet> {
167        Some(self.metrics.clone_inner())
168    }
169}
170
171/// Stream that synthesizes path structs from existing node/edge columns.
172struct BindFixedPathStream {
173    input: SendableRecordBatchStream,
174    node_variables: Vec<String>,
175    edge_variables: Vec<String>,
176    schema: SchemaRef,
177    graph_ctx: Arc<GraphExecutionContext>,
178    metrics: BaselineMetrics,
179}
180
181impl BindFixedPathStream {
182    fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
183        let num_rows = batch.num_rows();
184        let query_ctx = self.graph_ctx.query_context();
185
186        // Build node and edge struct fields
187        let node_struct_fields = super::common::node_struct_fields();
188        let edge_struct_fields = super::common::edge_struct_fields();
189
190        let mut nodes_builder = ListBuilder::new(StructBuilder::new(
191            node_struct_fields,
192            vec![
193                Box::new(UInt64Builder::new()),
194                Box::new(ListBuilder::new(StringBuilder::new())),
195                Box::new(LargeBinaryBuilder::new()),
196            ],
197        ));
198        let mut rels_builder = ListBuilder::new(StructBuilder::from_fields(
199            edge_struct_fields,
200            num_rows * self.edge_variables.len(),
201        ));
202        let mut path_validity = Vec::with_capacity(num_rows);
203
204        for row_idx in 0..num_rows {
205            // A fixed path is NULL if any required node or edge binding is missing.
206            let row_has_missing_node = self.node_variables.iter().any(|node_var| {
207                let vid_col_name = format!("{}._vid", node_var);
208                extract_column_value::<arrow_array::UInt64Array, u64>(
209                    &batch,
210                    &vid_col_name,
211                    row_idx,
212                    |arr, i| arr.value(i),
213                )
214                .is_none()
215            });
216            let row_has_missing_edge = self.edge_variables.iter().any(|edge_var| {
217                let eid_col_name = if edge_var.starts_with("__eid_to_") {
218                    edge_var.clone()
219                } else {
220                    format!("{}._eid", edge_var)
221                };
222                extract_column_value::<arrow_array::UInt64Array, u64>(
223                    &batch,
224                    &eid_col_name,
225                    row_idx,
226                    |arr, i| arr.value(i),
227                )
228                .is_none()
229            });
230
231            if row_has_missing_node || row_has_missing_edge {
232                nodes_builder.append(false);
233                rels_builder.append(false);
234                path_validity.push(false);
235                continue;
236            }
237
238            // Add all nodes in path order
239            for node_var in &self.node_variables {
240                let vid_col_name = format!("{}._vid", node_var);
241
242                let vid: Option<Vid> = extract_column_value(
243                    &batch,
244                    &vid_col_name,
245                    row_idx,
246                    |arr: &arrow_array::UInt64Array, i| Vid::from(arr.value(i)),
247                );
248
249                super::common::append_node_to_struct_optional(
250                    nodes_builder.values(),
251                    vid,
252                    &query_ctx,
253                );
254            }
255            nodes_builder.append(true);
256
257            // Add all edges in path order
258            // Edge i connects node_variables[i] to node_variables[i+1]
259            for (edge_idx, edge_var) in self.edge_variables.iter().enumerate() {
260                // Internal tracking columns like __eid_to_b are the column name directly;
261                // named edge variables use {var}._eid format
262                let eid_col_name = if edge_var.starts_with("__eid_to_") {
263                    edge_var.clone()
264                } else {
265                    format!("{}._eid", edge_var)
266                };
267
268                let eid: Option<Eid> = extract_column_value(
269                    &batch,
270                    &eid_col_name,
271                    row_idx,
272                    |arr: &arrow_array::UInt64Array, i| Eid::from(arr.value(i)),
273                );
274
275                // Try to get the edge type name from the batch column (populated by
276                // GraphTraverseExec from the schema). This is the primary source;
277                // L0 lookup is only a fallback for in-memory mutations.
278                let batch_type_name: Option<String> = if !edge_var.starts_with("__eid_to_") {
279                    let type_col_name = format!("{}._type", edge_var);
280                    extract_column_value(
281                        &batch,
282                        &type_col_name,
283                        row_idx,
284                        |arr: &arrow_array::StringArray, i| arr.value(i).to_string(),
285                    )
286                } else {
287                    None
288                };
289
290                // Get src/dst VIDs from adjacent node variables
291                let src_vid = self
292                    .node_variables
293                    .get(edge_idx)
294                    .and_then(|nv| {
295                        let col = format!("{}._vid", nv);
296                        extract_column_value::<arrow_array::UInt64Array, u64>(
297                            &batch,
298                            &col,
299                            row_idx,
300                            |arr, i| arr.value(i),
301                        )
302                    })
303                    .unwrap_or(0);
304                let dst_vid = self
305                    .node_variables
306                    .get(edge_idx + 1)
307                    .and_then(|nv| {
308                        let col = format!("{}._vid", nv);
309                        extract_column_value::<arrow_array::UInt64Array, u64>(
310                            &batch,
311                            &col,
312                            row_idx,
313                            |arr, i| arr.value(i),
314                        )
315                    })
316                    .unwrap_or(0);
317
318                super::common::append_edge_to_struct_optional(
319                    rels_builder.values(),
320                    eid,
321                    src_vid,
322                    dst_vid,
323                    batch_type_name,
324                    &query_ctx,
325                );
326            }
327            rels_builder.append(true);
328            path_validity.push(true);
329        }
330
331        let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
332        let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
333
334        let path_array =
335            super::common::build_path_struct_array(nodes_array, rels_array, path_validity)?;
336
337        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
338        columns.push(Arc::new(path_array));
339
340        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
341    }
342}
343
344impl Stream for BindFixedPathStream {
345    type Item = DFResult<RecordBatch>;
346
347    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
348        match self.input.poll_next_unpin(cx) {
349            Poll::Ready(Some(Ok(batch))) => {
350                let _timer = self.metrics.elapsed_compute().timer();
351                let result = self.process_batch(batch);
352                Poll::Ready(Some(result))
353            }
354            other => other,
355        }
356    }
357}
358
359impl RecordBatchStream for BindFixedPathStream {
360    fn schema(&self) -> SchemaRef {
361        self.schema.clone()
362    }
363}