Skip to main content

uni_query/query/df_graph/
bind_zero_length_path.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Zero-length path binding execution plan for DataFusion.
5//!
6//! This module provides [`BindZeroLengthPathExec`], a DataFusion [`ExecutionPlan`] that
7//! converts a single-node pattern `p = (a)` into a Path with one node and zero edges.
8//!
9//! # Example
10//!
11//! ```text
12//! Input:   [{"a._vid": 1, "a._label": "Person", ...}]
13//! Bind:    p = (a)
14//! Output:  [{"a._vid": 1, "a._label": "Person", ..., "p": Path{nodes: [node1], edges: []}}]
15//! ```
16
17use super::GraphExecutionContext;
18use super::common::compute_plan_properties;
19use arrow_array::builder::{
20    LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
21};
22use arrow_array::{ArrayRef, RecordBatch};
23use arrow_schema::SchemaRef;
24use datafusion::common::Result as DFResult;
25use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
26use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
28use futures::{Stream, StreamExt};
29use std::any::Any;
30use std::fmt;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34/// Execution plan that binds a zero-length path for single-node patterns.
35///
36/// For patterns like `p = (a)`, this creates a Path struct with one node
37/// (from the bound node variable) and zero edges.
38pub struct BindZeroLengthPathExec {
39    /// Input execution plan.
40    input: Arc<dyn ExecutionPlan>,
41
42    /// Node variable name (e.g., "a" in `p = (a)`).
43    node_variable: String,
44
45    /// Path variable name (e.g., "p" in `p = (a)`).
46    path_variable: String,
47
48    /// Graph execution context for property/label lookup.
49    graph_ctx: Arc<GraphExecutionContext>,
50
51    /// Output schema.
52    schema: SchemaRef,
53
54    /// Cached plan properties.
55    properties: PlanProperties,
56
57    /// Execution metrics.
58    metrics: ExecutionPlanMetricsSet,
59}
60
61impl fmt::Debug for BindZeroLengthPathExec {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        f.debug_struct("BindZeroLengthPathExec")
64            .field("node_variable", &self.node_variable)
65            .field("path_variable", &self.path_variable)
66            .finish()
67    }
68}
69
70impl BindZeroLengthPathExec {
71    /// Create a new zero-length path binding execution plan.
72    ///
73    /// # Arguments
74    ///
75    /// * `input` - Input plan providing rows with the node variable
76    /// * `node_variable` - Variable name of the bound node
77    /// * `path_variable` - Variable name for the path
78    /// * `graph_ctx` - Graph context for property/label lookups
79    pub fn new(
80        input: Arc<dyn ExecutionPlan>,
81        node_variable: String,
82        path_variable: String,
83        graph_ctx: Arc<GraphExecutionContext>,
84    ) -> Self {
85        let schema = super::common::extend_schema_with_path(input.schema(), &path_variable);
86        let properties = compute_plan_properties(schema.clone());
87
88        Self {
89            input,
90            node_variable,
91            path_variable,
92            graph_ctx,
93            schema,
94            properties,
95            metrics: ExecutionPlanMetricsSet::new(),
96        }
97    }
98}
99
100impl DisplayAs for BindZeroLengthPathExec {
101    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        write!(
103            f,
104            "BindZeroLengthPathExec: {} = ({})",
105            self.path_variable, self.node_variable
106        )
107    }
108}
109
110impl ExecutionPlan for BindZeroLengthPathExec {
111    fn name(&self) -> &str {
112        "BindZeroLengthPathExec"
113    }
114
115    fn as_any(&self) -> &dyn Any {
116        self
117    }
118
119    fn schema(&self) -> SchemaRef {
120        self.schema.clone()
121    }
122
123    fn properties(&self) -> &PlanProperties {
124        &self.properties
125    }
126
127    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
128        vec![&self.input]
129    }
130
131    fn with_new_children(
132        self: Arc<Self>,
133        children: Vec<Arc<dyn ExecutionPlan>>,
134    ) -> DFResult<Arc<dyn ExecutionPlan>> {
135        if children.len() != 1 {
136            return Err(datafusion::error::DataFusionError::Plan(
137                "BindZeroLengthPathExec requires exactly one child".to_string(),
138            ));
139        }
140
141        Ok(Arc::new(Self::new(
142            children[0].clone(),
143            self.node_variable.clone(),
144            self.path_variable.clone(),
145            self.graph_ctx.clone(),
146        )))
147    }
148
149    fn execute(
150        &self,
151        partition: usize,
152        context: Arc<TaskContext>,
153    ) -> DFResult<SendableRecordBatchStream> {
154        let input_stream = self.input.execute(partition, context)?;
155        let metrics = BaselineMetrics::new(&self.metrics, partition);
156
157        Ok(Box::pin(BindZeroLengthPathStream {
158            input: input_stream,
159            node_variable: self.node_variable.clone(),
160            schema: self.schema.clone(),
161            graph_ctx: self.graph_ctx.clone(),
162            metrics,
163        }))
164    }
165
166    fn metrics(&self) -> Option<MetricsSet> {
167        Some(self.metrics.clone_inner())
168    }
169}
170
171/// Stream that performs the zero-length path binding.
172struct BindZeroLengthPathStream {
173    /// Input stream.
174    input: SendableRecordBatchStream,
175
176    /// Node variable name.
177    node_variable: String,
178
179    /// Output schema.
180    schema: SchemaRef,
181
182    /// Graph context for lookups.
183    graph_ctx: Arc<GraphExecutionContext>,
184
185    /// Metrics.
186    metrics: BaselineMetrics,
187}
188
189use super::common::extract_column_value;
190
191impl BindZeroLengthPathStream {
192    /// Process a single input batch.
193    fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
194        let num_rows = batch.num_rows();
195        let query_ctx = self.graph_ctx.query_context();
196
197        let vid_col_name = format!("{}._vid", self.node_variable);
198
199        // Create builders for nodes and empty edges
200        let node_struct_fields = super::common::node_struct_fields();
201        let edge_struct_fields = super::common::edge_struct_fields();
202
203        let mut nodes_builder = ListBuilder::new(StructBuilder::new(
204            node_struct_fields,
205            vec![
206                Box::new(UInt64Builder::new()),
207                Box::new(ListBuilder::new(StringBuilder::new())),
208                Box::new(LargeBinaryBuilder::new()),
209            ],
210        ));
211        let mut rels_builder = ListBuilder::new(StructBuilder::from_fields(edge_struct_fields, 0));
212        let mut path_validity = Vec::with_capacity(num_rows);
213
214        for row_idx in 0..num_rows {
215            let vid: Option<uni_common::core::id::Vid> = extract_column_value(
216                &batch,
217                &vid_col_name,
218                row_idx,
219                |arr: &arrow_array::UInt64Array, i| uni_common::core::id::Vid::from(arr.value(i)),
220            );
221
222            if vid.is_none() {
223                nodes_builder.append(false);
224                rels_builder.append(false);
225                path_validity.push(false);
226                continue;
227            }
228
229            super::common::append_node_to_struct_optional(nodes_builder.values(), vid, &query_ctx);
230            nodes_builder.append(true);
231            rels_builder.append(true);
232            path_validity.push(true);
233        }
234
235        let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
236        let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
237
238        let path_array =
239            super::common::build_path_struct_array(nodes_array, rels_array, path_validity)?;
240
241        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
242        columns.push(Arc::new(path_array));
243
244        Ok(RecordBatch::try_new(self.schema.clone(), columns)?)
245    }
246}
247
248impl Stream for BindZeroLengthPathStream {
249    type Item = DFResult<RecordBatch>;
250
251    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252        match self.input.poll_next_unpin(cx) {
253            Poll::Ready(Some(Ok(batch))) => {
254                let _timer = self.metrics.elapsed_compute().timer();
255                let result = self.process_batch(batch);
256                Poll::Ready(Some(result))
257            }
258            other => other,
259        }
260    }
261}
262
263impl RecordBatchStream for BindZeroLengthPathStream {
264    fn schema(&self) -> SchemaRef {
265        self.schema.clone()
266    }
267}