uni_query/query/df_graph/
bind_zero_length_path.rs1use super::GraphExecutionContext;
18use super::common::compute_plan_properties;
19use arrow_array::builder::{
20 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
21};
22use arrow_array::{ArrayRef, RecordBatch};
23use arrow_schema::SchemaRef;
24use datafusion::common::Result as DFResult;
25use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
26use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
28use futures::{Stream, StreamExt};
29use std::any::Any;
30use std::fmt;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34pub struct BindZeroLengthPathExec {
39 input: Arc<dyn ExecutionPlan>,
41
42 node_variable: String,
44
45 path_variable: String,
47
48 graph_ctx: Arc<GraphExecutionContext>,
50
51 schema: SchemaRef,
53
54 properties: PlanProperties,
56
57 metrics: ExecutionPlanMetricsSet,
59}
60
61impl fmt::Debug for BindZeroLengthPathExec {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 f.debug_struct("BindZeroLengthPathExec")
64 .field("node_variable", &self.node_variable)
65 .field("path_variable", &self.path_variable)
66 .finish()
67 }
68}
69
70impl BindZeroLengthPathExec {
71 pub fn new(
80 input: Arc<dyn ExecutionPlan>,
81 node_variable: String,
82 path_variable: String,
83 graph_ctx: Arc<GraphExecutionContext>,
84 ) -> Self {
85 let schema = super::common::extend_schema_with_path(input.schema(), &path_variable);
86 let properties = compute_plan_properties(schema.clone());
87
88 Self {
89 input,
90 node_variable,
91 path_variable,
92 graph_ctx,
93 schema,
94 properties,
95 metrics: ExecutionPlanMetricsSet::new(),
96 }
97 }
98}
99
100impl DisplayAs for BindZeroLengthPathExec {
101 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 write!(
103 f,
104 "BindZeroLengthPathExec: {} = ({})",
105 self.path_variable, self.node_variable
106 )
107 }
108}
109
110impl ExecutionPlan for BindZeroLengthPathExec {
111 fn name(&self) -> &str {
112 "BindZeroLengthPathExec"
113 }
114
115 fn as_any(&self) -> &dyn Any {
116 self
117 }
118
119 fn schema(&self) -> SchemaRef {
120 self.schema.clone()
121 }
122
123 fn properties(&self) -> &PlanProperties {
124 &self.properties
125 }
126
127 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
128 vec![&self.input]
129 }
130
131 fn with_new_children(
132 self: Arc<Self>,
133 children: Vec<Arc<dyn ExecutionPlan>>,
134 ) -> DFResult<Arc<dyn ExecutionPlan>> {
135 if children.len() != 1 {
136 return Err(datafusion::error::DataFusionError::Plan(
137 "BindZeroLengthPathExec requires exactly one child".to_string(),
138 ));
139 }
140
141 Ok(Arc::new(Self::new(
142 children[0].clone(),
143 self.node_variable.clone(),
144 self.path_variable.clone(),
145 self.graph_ctx.clone(),
146 )))
147 }
148
149 fn execute(
150 &self,
151 partition: usize,
152 context: Arc<TaskContext>,
153 ) -> DFResult<SendableRecordBatchStream> {
154 let input_stream = self.input.execute(partition, context)?;
155 let metrics = BaselineMetrics::new(&self.metrics, partition);
156
157 Ok(Box::pin(BindZeroLengthPathStream {
158 input: input_stream,
159 node_variable: self.node_variable.clone(),
160 schema: self.schema.clone(),
161 graph_ctx: self.graph_ctx.clone(),
162 metrics,
163 }))
164 }
165
166 fn metrics(&self) -> Option<MetricsSet> {
167 Some(self.metrics.clone_inner())
168 }
169}
170
171struct BindZeroLengthPathStream {
173 input: SendableRecordBatchStream,
175
176 node_variable: String,
178
179 schema: SchemaRef,
181
182 graph_ctx: Arc<GraphExecutionContext>,
184
185 metrics: BaselineMetrics,
187}
188
189use super::common::extract_column_value;
190
191impl BindZeroLengthPathStream {
192 fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
194 let num_rows = batch.num_rows();
195 let query_ctx = self.graph_ctx.query_context();
196
197 let vid_col_name = format!("{}._vid", self.node_variable);
198
199 let node_struct_fields = super::common::node_struct_fields();
201 let edge_struct_fields = super::common::edge_struct_fields();
202
203 let mut nodes_builder = ListBuilder::new(StructBuilder::new(
204 node_struct_fields,
205 vec![
206 Box::new(UInt64Builder::new()),
207 Box::new(ListBuilder::new(StringBuilder::new())),
208 Box::new(LargeBinaryBuilder::new()),
209 ],
210 ));
211 let mut rels_builder = ListBuilder::new(StructBuilder::from_fields(edge_struct_fields, 0));
212 let mut path_validity = Vec::with_capacity(num_rows);
213
214 for row_idx in 0..num_rows {
215 let vid: Option<uni_common::core::id::Vid> = extract_column_value(
216 &batch,
217 &vid_col_name,
218 row_idx,
219 |arr: &arrow_array::UInt64Array, i| uni_common::core::id::Vid::from(arr.value(i)),
220 );
221
222 if vid.is_none() {
223 nodes_builder.append(false);
224 rels_builder.append(false);
225 path_validity.push(false);
226 continue;
227 }
228
229 super::common::append_node_to_struct_optional(nodes_builder.values(), vid, &query_ctx);
230 nodes_builder.append(true);
231 rels_builder.append(true);
232 path_validity.push(true);
233 }
234
235 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
236 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
237
238 let path_array =
239 super::common::build_path_struct_array(nodes_array, rels_array, path_validity)?;
240
241 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
242 columns.push(Arc::new(path_array));
243
244 Ok(RecordBatch::try_new(self.schema.clone(), columns)?)
245 }
246}
247
248impl Stream for BindZeroLengthPathStream {
249 type Item = DFResult<RecordBatch>;
250
251 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252 match self.input.poll_next_unpin(cx) {
253 Poll::Ready(Some(Ok(batch))) => {
254 let _timer = self.metrics.elapsed_compute().timer();
255 let result = self.process_batch(batch);
256 Poll::Ready(Some(result))
257 }
258 other => other,
259 }
260 }
261}
262
263impl RecordBatchStream for BindZeroLengthPathStream {
264 fn schema(&self) -> SchemaRef {
265 self.schema.clone()
266 }
267}