uni_query/query/df_graph/
bind_fixed_path.rs1use super::GraphExecutionContext;
13use super::common::{arrow_err, compute_plan_properties, extract_column_value};
14use arrow_array::builder::{
15 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
16};
17use arrow_array::{ArrayRef, RecordBatch};
18use arrow_schema::SchemaRef;
19use datafusion::common::Result as DFResult;
20use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
21use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
22use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
23use futures::{Stream, StreamExt};
24use std::any::Any;
25use std::fmt;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use uni_common::core::id::{Eid, Vid};
30
31pub struct BindFixedPathExec {
37 input: Arc<dyn ExecutionPlan>,
39
40 node_variables: Vec<String>,
42
43 edge_variables: Vec<String>,
45
46 path_variable: String,
48
49 graph_ctx: Arc<GraphExecutionContext>,
51
52 schema: SchemaRef,
54
55 properties: PlanProperties,
57
58 metrics: ExecutionPlanMetricsSet,
60}
61
62impl fmt::Debug for BindFixedPathExec {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("BindFixedPathExec")
65 .field("node_variables", &self.node_variables)
66 .field("edge_variables", &self.edge_variables)
67 .field("path_variable", &self.path_variable)
68 .finish()
69 }
70}
71
72impl BindFixedPathExec {
73 pub fn new(
74 input: Arc<dyn ExecutionPlan>,
75 node_variables: Vec<String>,
76 edge_variables: Vec<String>,
77 path_variable: String,
78 graph_ctx: Arc<GraphExecutionContext>,
79 ) -> Self {
80 let schema = super::common::extend_schema_with_path(input.schema(), &path_variable);
81 let properties = compute_plan_properties(schema.clone());
82
83 Self {
84 input,
85 node_variables,
86 edge_variables,
87 path_variable,
88 graph_ctx,
89 schema,
90 properties,
91 metrics: ExecutionPlanMetricsSet::new(),
92 }
93 }
94}
95
96impl DisplayAs for BindFixedPathExec {
97 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 write!(
99 f,
100 "BindFixedPathExec: {} = ({}) via [{}]",
101 self.path_variable,
102 self.node_variables.join(", "),
103 self.edge_variables.join(", "),
104 )
105 }
106}
107
108impl ExecutionPlan for BindFixedPathExec {
109 fn name(&self) -> &str {
110 "BindFixedPathExec"
111 }
112
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn schema(&self) -> SchemaRef {
118 self.schema.clone()
119 }
120
121 fn properties(&self) -> &PlanProperties {
122 &self.properties
123 }
124
125 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
126 vec![&self.input]
127 }
128
129 fn with_new_children(
130 self: Arc<Self>,
131 children: Vec<Arc<dyn ExecutionPlan>>,
132 ) -> DFResult<Arc<dyn ExecutionPlan>> {
133 if children.len() != 1 {
134 return Err(datafusion::error::DataFusionError::Plan(
135 "BindFixedPathExec requires exactly one child".to_string(),
136 ));
137 }
138
139 Ok(Arc::new(Self::new(
140 children[0].clone(),
141 self.node_variables.clone(),
142 self.edge_variables.clone(),
143 self.path_variable.clone(),
144 self.graph_ctx.clone(),
145 )))
146 }
147
148 fn execute(
149 &self,
150 partition: usize,
151 context: Arc<TaskContext>,
152 ) -> DFResult<SendableRecordBatchStream> {
153 let input_stream = self.input.execute(partition, context)?;
154 let metrics = BaselineMetrics::new(&self.metrics, partition);
155
156 Ok(Box::pin(BindFixedPathStream {
157 input: input_stream,
158 node_variables: self.node_variables.clone(),
159 edge_variables: self.edge_variables.clone(),
160 schema: self.schema.clone(),
161 graph_ctx: self.graph_ctx.clone(),
162 metrics,
163 }))
164 }
165
166 fn metrics(&self) -> Option<MetricsSet> {
167 Some(self.metrics.clone_inner())
168 }
169}
170
171struct BindFixedPathStream {
173 input: SendableRecordBatchStream,
174 node_variables: Vec<String>,
175 edge_variables: Vec<String>,
176 schema: SchemaRef,
177 graph_ctx: Arc<GraphExecutionContext>,
178 metrics: BaselineMetrics,
179}
180
181impl BindFixedPathStream {
182 fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
183 let num_rows = batch.num_rows();
184 let query_ctx = self.graph_ctx.query_context();
185
186 let node_struct_fields = super::common::node_struct_fields();
188 let edge_struct_fields = super::common::edge_struct_fields();
189
190 let mut nodes_builder = ListBuilder::new(StructBuilder::new(
191 node_struct_fields,
192 vec![
193 Box::new(UInt64Builder::new()),
194 Box::new(ListBuilder::new(StringBuilder::new())),
195 Box::new(LargeBinaryBuilder::new()),
196 ],
197 ));
198 let mut rels_builder = ListBuilder::new(StructBuilder::from_fields(
199 edge_struct_fields,
200 num_rows * self.edge_variables.len(),
201 ));
202 let mut path_validity = Vec::with_capacity(num_rows);
203
204 for row_idx in 0..num_rows {
205 let row_has_missing_node = self.node_variables.iter().any(|node_var| {
207 let vid_col_name = format!("{}._vid", node_var);
208 extract_column_value::<arrow_array::UInt64Array, u64>(
209 &batch,
210 &vid_col_name,
211 row_idx,
212 |arr, i| arr.value(i),
213 )
214 .is_none()
215 });
216 let row_has_missing_edge = self.edge_variables.iter().any(|edge_var| {
217 let eid_col_name = if edge_var.starts_with("__eid_to_") {
218 edge_var.clone()
219 } else {
220 format!("{}._eid", edge_var)
221 };
222 extract_column_value::<arrow_array::UInt64Array, u64>(
223 &batch,
224 &eid_col_name,
225 row_idx,
226 |arr, i| arr.value(i),
227 )
228 .is_none()
229 });
230
231 if row_has_missing_node || row_has_missing_edge {
232 nodes_builder.append(false);
233 rels_builder.append(false);
234 path_validity.push(false);
235 continue;
236 }
237
238 for node_var in &self.node_variables {
240 let vid_col_name = format!("{}._vid", node_var);
241
242 let vid: Option<Vid> = extract_column_value(
243 &batch,
244 &vid_col_name,
245 row_idx,
246 |arr: &arrow_array::UInt64Array, i| Vid::from(arr.value(i)),
247 );
248
249 super::common::append_node_to_struct_optional(
250 nodes_builder.values(),
251 vid,
252 &query_ctx,
253 );
254 }
255 nodes_builder.append(true);
256
257 for (edge_idx, edge_var) in self.edge_variables.iter().enumerate() {
260 let eid_col_name = if edge_var.starts_with("__eid_to_") {
263 edge_var.clone()
264 } else {
265 format!("{}._eid", edge_var)
266 };
267
268 let eid: Option<Eid> = extract_column_value(
269 &batch,
270 &eid_col_name,
271 row_idx,
272 |arr: &arrow_array::UInt64Array, i| Eid::from(arr.value(i)),
273 );
274
275 let batch_type_name: Option<String> = if !edge_var.starts_with("__eid_to_") {
279 let type_col_name = format!("{}._type", edge_var);
280 extract_column_value(
281 &batch,
282 &type_col_name,
283 row_idx,
284 |arr: &arrow_array::StringArray, i| arr.value(i).to_string(),
285 )
286 } else {
287 None
288 };
289
290 let src_vid = self
292 .node_variables
293 .get(edge_idx)
294 .and_then(|nv| {
295 let col = format!("{}._vid", nv);
296 extract_column_value::<arrow_array::UInt64Array, u64>(
297 &batch,
298 &col,
299 row_idx,
300 |arr, i| arr.value(i),
301 )
302 })
303 .unwrap_or(0);
304 let dst_vid = self
305 .node_variables
306 .get(edge_idx + 1)
307 .and_then(|nv| {
308 let col = format!("{}._vid", nv);
309 extract_column_value::<arrow_array::UInt64Array, u64>(
310 &batch,
311 &col,
312 row_idx,
313 |arr, i| arr.value(i),
314 )
315 })
316 .unwrap_or(0);
317
318 super::common::append_edge_to_struct_optional(
319 rels_builder.values(),
320 eid,
321 src_vid,
322 dst_vid,
323 batch_type_name,
324 &query_ctx,
325 );
326 }
327 rels_builder.append(true);
328 path_validity.push(true);
329 }
330
331 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
332 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
333
334 let path_array =
335 super::common::build_path_struct_array(nodes_array, rels_array, path_validity)?;
336
337 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
338 columns.push(Arc::new(path_array));
339
340 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
341 }
342}
343
344impl Stream for BindFixedPathStream {
345 type Item = DFResult<RecordBatch>;
346
347 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
348 match self.input.poll_next_unpin(cx) {
349 Poll::Ready(Some(Ok(batch))) => {
350 let _timer = self.metrics.elapsed_compute().timer();
351 let result = self.process_batch(batch);
352 Poll::Ready(Some(result))
353 }
354 other => other,
355 }
356 }
357}
358
359impl RecordBatchStream for BindFixedPathStream {
360 fn schema(&self) -> SchemaRef {
361 self.schema.clone()
362 }
363}