uni_query/query/df_graph/
ext_id_lookup.rs1use crate::query::df_graph::GraphExecutionContext;
18use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
19use arrow_array::builder::StringBuilder;
20use arrow_array::{ArrayRef, RecordBatch, UInt64Array};
21use arrow_schema::{DataType, Field, Schema, SchemaRef};
22use datafusion::common::Result as DFResult;
23use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
24use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use futures::Stream;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use uni_common::core::id::Vid;
34
35pub struct GraphExtIdLookupExec {
40 graph_ctx: Arc<GraphExecutionContext>,
42
43 variable: String,
45
46 ext_id: String,
48
49 projected_properties: Vec<String>,
51
52 optional: bool,
54
55 schema: SchemaRef,
57
58 properties: PlanProperties,
60
61 metrics: ExecutionPlanMetricsSet,
63}
64
65impl fmt::Debug for GraphExtIdLookupExec {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 f.debug_struct("GraphExtIdLookupExec")
68 .field("variable", &self.variable)
69 .field("ext_id", &self.ext_id)
70 .field("projected_properties", &self.projected_properties)
71 .field("optional", &self.optional)
72 .finish()
73 }
74}
75
76impl GraphExtIdLookupExec {
77 pub fn new(
79 graph_ctx: Arc<GraphExecutionContext>,
80 variable: impl Into<String>,
81 ext_id: impl Into<String>,
82 projected_properties: Vec<String>,
83 optional: bool,
84 ) -> Self {
85 let variable = variable.into();
86 let ext_id = ext_id.into();
87
88 let schema = Self::build_schema(&variable, &projected_properties);
90 let properties = compute_plan_properties(schema.clone());
91
92 Self {
93 graph_ctx,
94 variable,
95 ext_id,
96 projected_properties,
97 optional,
98 schema,
99 properties,
100 metrics: ExecutionPlanMetricsSet::new(),
101 }
102 }
103
104 fn build_schema(variable: &str, properties: &[String]) -> SchemaRef {
106 let mut fields = vec![
107 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
108 Field::new(format!("{}.ext_id", variable), DataType::Utf8, false),
109 Field::new(format!("{}._label", variable), DataType::Utf8, false),
110 ];
111
112 for prop in properties {
114 let col_name = format!("{}.{}", variable, prop);
115 fields.push(Field::new(&col_name, DataType::Utf8, true));
116 }
117
118 Arc::new(Schema::new(fields))
119 }
120}
121
122impl DisplayAs for GraphExtIdLookupExec {
123 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 write!(
125 f,
126 "GraphExtIdLookupExec: ext_id={}, variable={}, optional={}",
127 self.ext_id, self.variable, self.optional
128 )
129 }
130}
131
132impl ExecutionPlan for GraphExtIdLookupExec {
133 fn name(&self) -> &str {
134 "GraphExtIdLookupExec"
135 }
136
137 fn as_any(&self) -> &dyn Any {
138 self
139 }
140
141 fn schema(&self) -> SchemaRef {
142 self.schema.clone()
143 }
144
145 fn properties(&self) -> &PlanProperties {
146 &self.properties
147 }
148
149 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
150 vec![]
151 }
152
153 fn with_new_children(
154 self: Arc<Self>,
155 children: Vec<Arc<dyn ExecutionPlan>>,
156 ) -> DFResult<Arc<dyn ExecutionPlan>> {
157 if !children.is_empty() {
158 return Err(datafusion::error::DataFusionError::Internal(
159 "GraphExtIdLookupExec has no children".to_string(),
160 ));
161 }
162 Ok(self)
163 }
164
165 fn execute(
166 &self,
167 partition: usize,
168 _context: Arc<TaskContext>,
169 ) -> DFResult<SendableRecordBatchStream> {
170 let metrics = BaselineMetrics::new(&self.metrics, partition);
171
172 Ok(Box::pin(ExtIdLookupStream::new(
173 self.graph_ctx.clone(),
174 self.variable.clone(),
175 self.ext_id.clone(),
176 self.projected_properties.clone(),
177 self.optional,
178 self.schema.clone(),
179 metrics,
180 )))
181 }
182
183 fn metrics(&self) -> Option<MetricsSet> {
184 Some(self.metrics.clone_inner())
185 }
186}
187
188enum ExtIdLookupState {
190 Init,
192 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
194 Done,
196}
197
198struct ExtIdLookupStream {
200 graph_ctx: Arc<GraphExecutionContext>,
202
203 variable: String,
205
206 ext_id: String,
208
209 properties: Vec<String>,
211
212 optional: bool,
214
215 schema: SchemaRef,
217
218 state: ExtIdLookupState,
220
221 metrics: BaselineMetrics,
223}
224
225impl ExtIdLookupStream {
226 fn new(
227 graph_ctx: Arc<GraphExecutionContext>,
228 variable: String,
229 ext_id: String,
230 properties: Vec<String>,
231 optional: bool,
232 schema: SchemaRef,
233 metrics: BaselineMetrics,
234 ) -> Self {
235 Self {
236 graph_ctx,
237 variable,
238 ext_id,
239 properties,
240 optional,
241 schema,
242 state: ExtIdLookupState::Init,
243 metrics,
244 }
245 }
246}
247
248impl Stream for ExtIdLookupStream {
249 type Item = DFResult<RecordBatch>;
250
251 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252 loop {
253 let state = std::mem::replace(&mut self.state, ExtIdLookupState::Done);
254
255 match state {
256 ExtIdLookupState::Init => {
257 let graph_ctx = self.graph_ctx.clone();
259 let variable = self.variable.clone();
260 let ext_id = self.ext_id.clone();
261 let properties = self.properties.clone();
262 let optional = self.optional;
263 let schema = self.schema.clone();
264
265 let fut = async move {
266 graph_ctx.check_timeout().map_err(|e| {
268 datafusion::error::DataFusionError::Execution(e.to_string())
269 })?;
270
271 execute_lookup(
272 &graph_ctx,
273 &variable,
274 &ext_id,
275 &properties,
276 optional,
277 &schema,
278 )
279 .await
280 };
281
282 self.state = ExtIdLookupState::Executing(Box::pin(fut));
283 }
285 ExtIdLookupState::Executing(mut fut) => match fut.as_mut().poll(cx) {
286 Poll::Ready(Ok(batch)) => {
287 self.state = ExtIdLookupState::Done;
288 self.metrics
289 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
290 return Poll::Ready(batch.map(Ok));
291 }
292 Poll::Ready(Err(e)) => {
293 self.state = ExtIdLookupState::Done;
294 return Poll::Ready(Some(Err(e)));
295 }
296 Poll::Pending => {
297 self.state = ExtIdLookupState::Executing(fut);
298 return Poll::Pending;
299 }
300 },
301 ExtIdLookupState::Done => {
302 return Poll::Ready(None);
303 }
304 }
305 }
306 }
307}
308
309impl RecordBatchStream for ExtIdLookupStream {
310 fn schema(&self) -> SchemaRef {
311 self.schema.clone()
312 }
313}
314
315async fn execute_lookup(
317 graph_ctx: &GraphExecutionContext,
318 variable: &str,
319 ext_id: &str,
320 properties: &[String],
321 optional: bool,
322 schema: &SchemaRef,
323) -> DFResult<Option<RecordBatch>> {
324 let storage = graph_ctx.storage();
325
326 let found_vid = storage
328 .find_vertex_by_ext_id(ext_id)
329 .await
330 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
331
332 let Some(vid) = found_vid else {
333 if optional {
335 return Ok(Some(build_null_row(variable, properties, schema)?));
336 }
337 return Ok(Some(RecordBatch::new_empty(schema.clone())));
338 };
339
340 let property_manager = graph_ctx.property_manager();
342 let query_ctx = graph_ctx.query_context();
343
344 let props_opt = property_manager
345 .get_all_vertex_props_with_ctx(vid, Some(&query_ctx))
346 .await
347 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
348
349 let Some(props) = props_opt else {
350 if optional {
352 return Ok(Some(build_null_row(variable, properties, schema)?));
353 }
354 return Ok(Some(RecordBatch::new_empty(schema.clone())));
355 };
356
357 let labels = storage
359 .find_vertex_labels_by_vid(vid)
360 .await
361 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
362 .unwrap_or_default();
363
364 let label_name = labels
365 .first()
366 .cloned()
367 .unwrap_or_else(|| "Unknown".to_string());
368
369 let batch = build_result_row(
371 vid,
372 ext_id,
373 &label_name,
374 &props,
375 variable,
376 properties,
377 schema,
378 )?;
379 Ok(Some(batch))
380}
381
382fn build_null_row(
384 _variable: &str,
385 _properties: &[String],
386 schema: &SchemaRef,
387) -> DFResult<RecordBatch> {
388 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
390
391 for field in schema.fields().iter() {
393 match field.data_type() {
394 DataType::UInt64 => {
395 columns.push(Arc::new(arrow_array::UInt64Array::from(vec![
396 None as Option<u64>,
397 ])));
398 }
399 DataType::Utf8 => {
400 let mut builder = StringBuilder::new();
401 builder.append_null();
402 columns.push(Arc::new(builder.finish()));
403 }
404 _ => {
405 let mut builder = StringBuilder::new();
406 builder.append_null();
407 columns.push(Arc::new(builder.finish()));
408 }
409 }
410 }
411
412 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
413}
414
415fn build_result_row(
417 vid: Vid,
418 ext_id: &str,
419 label: &str,
420 props: &HashMap<String, uni_common::Value>,
421 _variable: &str,
422 properties: &[String],
423 schema: &SchemaRef,
424) -> DFResult<RecordBatch> {
425 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
426
427 columns.push(Arc::new(UInt64Array::from(vec![vid.as_u64()])));
429
430 let mut ext_id_builder = StringBuilder::new();
432 ext_id_builder.append_value(ext_id);
433 columns.push(Arc::new(ext_id_builder.finish()));
434
435 let mut label_builder = StringBuilder::new();
437 label_builder.append_value(label);
438 columns.push(Arc::new(label_builder.finish()));
439
440 for prop in properties {
442 let mut builder = StringBuilder::new();
443 if let Some(val) = props.get(prop) {
444 match val {
445 uni_common::Value::String(s) => builder.append_value(s),
446 uni_common::Value::Null => builder.append_null(),
447 other => builder.append_value(other.to_string()),
448 }
449 } else {
450 builder.append_null();
451 }
452 columns.push(Arc::new(builder.finish()));
453 }
454
455 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_build_schema() {
464 let schema =
465 GraphExtIdLookupExec::build_schema("n", &["name".to_string(), "age".to_string()]);
466
467 assert_eq!(schema.fields().len(), 5);
468 assert_eq!(schema.field(0).name(), "n._vid");
469 assert_eq!(schema.field(1).name(), "n.ext_id");
470 assert_eq!(schema.field(2).name(), "n._label");
471 assert_eq!(schema.field(3).name(), "n.name");
472 assert_eq!(schema.field(4).name(), "n.age");
473 }
474}