uni_query/query/df_graph/
bind_fixed_path.rs1use super::GraphExecutionContext;
13use super::common::{arrow_err, compute_plan_properties, extract_column_value};
14use arrow_array::builder::{
15 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
16};
17use arrow_array::{ArrayRef, RecordBatch};
18use arrow_schema::SchemaRef;
19use datafusion::common::Result as DFResult;
20use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
21use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
22use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
23use futures::{Stream, StreamExt};
24use std::any::Any;
25use std::fmt;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use uni_common::core::id::{Eid, Vid};
30
31pub struct BindFixedPathExec {
37 input: Arc<dyn ExecutionPlan>,
39
40 node_variables: Vec<String>,
42
43 edge_variables: Vec<String>,
45
46 path_variable: String,
48
49 graph_ctx: Arc<GraphExecutionContext>,
51
52 schema: SchemaRef,
54
55 properties: Arc<PlanProperties>,
57
58 metrics: ExecutionPlanMetricsSet,
60}
61
62impl fmt::Debug for BindFixedPathExec {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("BindFixedPathExec")
65 .field("node_variables", &self.node_variables)
66 .field("edge_variables", &self.edge_variables)
67 .field("path_variable", &self.path_variable)
68 .finish()
69 }
70}
71
72impl BindFixedPathExec {
73 pub fn new(
74 input: Arc<dyn ExecutionPlan>,
75 node_variables: Vec<String>,
76 edge_variables: Vec<String>,
77 path_variable: String,
78 graph_ctx: Arc<GraphExecutionContext>,
79 ) -> Self {
80 let schema = super::common::extend_schema_with_path(input.schema(), &path_variable);
81 let properties = compute_plan_properties(schema.clone());
82
83 Self {
84 input,
85 node_variables,
86 edge_variables,
87 path_variable,
88 graph_ctx,
89 schema,
90 properties,
91 metrics: ExecutionPlanMetricsSet::new(),
92 }
93 }
94}
95
96impl DisplayAs for BindFixedPathExec {
97 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 write!(
99 f,
100 "BindFixedPathExec: {} = ({}) via [{}]",
101 self.path_variable,
102 self.node_variables.join(", "),
103 self.edge_variables.join(", "),
104 )
105 }
106}
107
108impl ExecutionPlan for BindFixedPathExec {
109 fn name(&self) -> &str {
110 "BindFixedPathExec"
111 }
112
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn schema(&self) -> SchemaRef {
118 self.schema.clone()
119 }
120
121 fn properties(&self) -> &Arc<PlanProperties> {
122 &self.properties
123 }
124
125 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
126 vec![&self.input]
127 }
128
129 fn with_new_children(
130 self: Arc<Self>,
131 children: Vec<Arc<dyn ExecutionPlan>>,
132 ) -> DFResult<Arc<dyn ExecutionPlan>> {
133 if children.len() != 1 {
134 return Err(datafusion::error::DataFusionError::Plan(
135 "BindFixedPathExec requires exactly one child".to_string(),
136 ));
137 }
138
139 Ok(Arc::new(Self::new(
140 children[0].clone(),
141 self.node_variables.clone(),
142 self.edge_variables.clone(),
143 self.path_variable.clone(),
144 self.graph_ctx.clone(),
145 )))
146 }
147
148 fn execute(
149 &self,
150 partition: usize,
151 context: Arc<TaskContext>,
152 ) -> DFResult<SendableRecordBatchStream> {
153 let input_stream = self.input.execute(partition, context)?;
154 let metrics = BaselineMetrics::new(&self.metrics, partition);
155
156 Ok(Box::pin(BindFixedPathStream {
157 input: input_stream,
158 node_variables: self.node_variables.clone(),
159 edge_variables: self.edge_variables.clone(),
160 schema: self.schema.clone(),
161 graph_ctx: self.graph_ctx.clone(),
162 metrics,
163 }))
164 }
165
166 fn metrics(&self) -> Option<MetricsSet> {
167 Some(self.metrics.clone_inner())
168 }
169}
170
171struct BindFixedPathStream {
173 input: SendableRecordBatchStream,
174 node_variables: Vec<String>,
175 edge_variables: Vec<String>,
176 schema: SchemaRef,
177 graph_ctx: Arc<GraphExecutionContext>,
178 metrics: BaselineMetrics,
179}
180
181impl BindFixedPathStream {
182 fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
183 let num_rows = batch.num_rows();
184 let query_ctx = self.graph_ctx.query_context();
185
186 let node_struct_fields = super::common::node_struct_fields();
188 let edge_struct_fields = super::common::edge_struct_fields();
189
190 let mut nodes_builder = ListBuilder::new(StructBuilder::new(
191 node_struct_fields,
192 vec![
193 Box::new(UInt64Builder::new()),
194 Box::new(ListBuilder::new(StringBuilder::new())),
195 Box::new(LargeBinaryBuilder::new()),
196 ],
197 ));
198 let mut rels_builder = ListBuilder::new(StructBuilder::from_fields(
199 edge_struct_fields,
200 num_rows * self.edge_variables.len(),
201 ));
202 let mut path_validity = Vec::with_capacity(num_rows);
203
204 for row_idx in 0..num_rows {
205 let row_has_missing_node = self.node_variables.iter().any(|node_var| {
207 let vid_col_name = format!("{}._vid", node_var);
208 extract_column_value::<arrow_array::UInt64Array, u64>(
209 &batch,
210 &vid_col_name,
211 row_idx,
212 |arr, i| arr.value(i),
213 )
214 .is_none()
215 });
216 let row_has_missing_edge = self.edge_variables.iter().any(|edge_var| {
217 let eid_col_name = if edge_var.starts_with("__eid_to_") {
218 edge_var.clone()
219 } else {
220 format!("{}._eid", edge_var)
221 };
222 extract_column_value::<arrow_array::UInt64Array, u64>(
223 &batch,
224 &eid_col_name,
225 row_idx,
226 |arr, i| arr.value(i),
227 )
228 .is_none()
229 });
230
231 if row_has_missing_node || row_has_missing_edge {
232 nodes_builder.append(false);
233 rels_builder.append(false);
234 path_validity.push(false);
235 continue;
236 }
237
238 for node_var in &self.node_variables {
240 let vid_col_name = format!("{}._vid", node_var);
241
242 let vid: Option<Vid> = extract_column_value(
243 &batch,
244 &vid_col_name,
245 row_idx,
246 |arr: &arrow_array::UInt64Array, i| Vid::from(arr.value(i)),
247 );
248
249 super::common::append_node_to_struct_optional(
250 nodes_builder.values(),
251 vid,
252 &query_ctx,
253 );
254 }
255 nodes_builder.append(true);
256
257 for (edge_idx, edge_var) in self.edge_variables.iter().enumerate() {
260 let eid_col_name = if edge_var.starts_with("__eid_to_") {
263 edge_var.clone()
264 } else {
265 format!("{}._eid", edge_var)
266 };
267
268 let eid: Option<Eid> = extract_column_value(
269 &batch,
270 &eid_col_name,
271 row_idx,
272 |arr: &arrow_array::UInt64Array, i| Eid::from(arr.value(i)),
273 );
274
275 let batch_type_name: Option<String> = if !edge_var.starts_with("__eid_to_") {
279 let type_col_name = format!("{}._type", edge_var);
280 extract_column_value(
281 &batch,
282 &type_col_name,
283 row_idx,
284 |arr: &arrow_array::StringArray, i| arr.value(i).to_string(),
285 )
286 } else {
287 None
288 };
289
290 let adjacent = |idx: usize| {
301 self.node_variables
302 .get(idx)
303 .and_then(|nv| {
304 let col = format!("{}._vid", nv);
305 extract_column_value::<arrow_array::UInt64Array, u64>(
306 &batch,
307 &col,
308 row_idx,
309 |arr, i| arr.value(i),
310 )
311 })
312 .unwrap_or(0)
313 };
314 let traversal_src = adjacent(edge_idx);
315 let traversal_dst = adjacent(edge_idx + 1);
316
317 let (src_vid, dst_vid) = match eid {
318 Some(e) => {
319 let edge_type_ids: Vec<u32> = batch_type_name
320 .as_deref()
321 .and_then(|name| {
322 self.graph_ctx
323 .storage()
324 .schema_manager()
325 .schema()
326 .edge_type_id_by_name(name)
327 })
328 .or_else(|| {
329 uni_store::runtime::l0_visibility::get_edge_type(e, &query_ctx)
330 .and_then(|name| {
331 self.graph_ctx
332 .storage()
333 .schema_manager()
334 .schema()
335 .edge_type_id_by_name(&name)
336 })
337 })
338 .map(|id| vec![id])
339 .unwrap_or_default();
340 self.graph_ctx.resolve_stored_edge_endpoints(
341 e,
342 uni_common::core::id::Vid::from(traversal_src),
343 uni_common::core::id::Vid::from(traversal_dst),
344 &edge_type_ids,
345 )
346 }
347 None => (traversal_src, traversal_dst),
348 };
349
350 super::common::append_edge_to_struct_optional(
351 rels_builder.values(),
352 eid,
353 src_vid,
354 dst_vid,
355 batch_type_name,
356 &query_ctx,
357 );
358 }
359 rels_builder.append(true);
360 path_validity.push(true);
361 }
362
363 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
364 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
365
366 let path_array =
367 super::common::build_path_struct_array(nodes_array, rels_array, path_validity)?;
368
369 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
370 columns.push(Arc::new(path_array));
371
372 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
373 }
374}
375
376impl Stream for BindFixedPathStream {
377 type Item = DFResult<RecordBatch>;
378
379 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
380 match self.input.poll_next_unpin(cx) {
381 Poll::Ready(Some(Ok(batch))) => {
382 let _timer = self.metrics.elapsed_compute().timer();
383 let result = self.process_batch(batch);
384 if let Ok(ref b) = result {
385 self.metrics.record_output(b.num_rows());
386 }
387 Poll::Ready(Some(result))
388 }
389 other => other,
390 }
391 }
392}
393
394impl RecordBatchStream for BindFixedPathStream {
395 fn schema(&self) -> SchemaRef {
396 self.schema.clone()
397 }
398}